Merge pull request #108496 from khenidak/remove-port-opener

kube-proxy: remove port opener
This commit is contained in:
Kubernetes Prow Robot 2022-03-16 03:01:49 -07:00 committed by GitHub
commit 7152825c06
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 3 additions and 209 deletions

View File

@ -190,7 +190,6 @@ type Proxier struct {
mu sync.Mutex // protects the following fields mu sync.Mutex // protects the following fields
serviceMap proxy.ServiceMap serviceMap proxy.ServiceMap
endpointsMap proxy.EndpointsMap endpointsMap proxy.EndpointsMap
portsMap map[netutils.LocalPort]netutils.Closeable
nodeLabels map[string]string nodeLabels map[string]string
// endpointSlicesSynced, and servicesSynced are set to true // endpointSlicesSynced, and servicesSynced are set to true
// when corresponding objects are synced after startup. This is used to avoid // when corresponding objects are synced after startup. This is used to avoid
@ -209,7 +208,6 @@ type Proxier struct {
localDetector proxyutiliptables.LocalTrafficDetector localDetector proxyutiliptables.LocalTrafficDetector
hostname string hostname string
nodeIP net.IP nodeIP net.IP
portMapper netutils.PortOpener
recorder events.EventRecorder recorder events.EventRecorder
serviceHealthServer healthcheck.ServiceHealthServer serviceHealthServer healthcheck.ServiceHealthServer
@ -299,7 +297,6 @@ func NewProxier(ipt utiliptables.Interface,
} }
proxier := &Proxier{ proxier := &Proxier{
portsMap: make(map[netutils.LocalPort]netutils.Closeable),
serviceMap: make(proxy.ServiceMap), serviceMap: make(proxy.ServiceMap),
serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, ipFamily, recorder, nil), serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, ipFamily, recorder, nil),
endpointsMap: make(proxy.EndpointsMap), endpointsMap: make(proxy.EndpointsMap),
@ -312,7 +309,6 @@ func NewProxier(ipt utiliptables.Interface,
localDetector: localDetector, localDetector: localDetector,
hostname: hostname, hostname: hostname,
nodeIP: nodeIP, nodeIP: nodeIP,
portMapper: &netutils.ListenPortOpener,
recorder: recorder, recorder: recorder,
serviceHealthServer: serviceHealthServer, serviceHealthServer: serviceHealthServer,
healthzServer: healthzServer, healthzServer: healthzServer,
@ -945,9 +941,6 @@ func (proxier *Proxier) syncProxyRules() {
// Accumulate NAT chains to keep. // Accumulate NAT chains to keep.
activeNATChains := map[utiliptables.Chain]bool{} // use a map as a set activeNATChains := map[utiliptables.Chain]bool{} // use a map as a set
// Accumulate the set of local ports that we will be holding open once this update is complete
replacementPortsMap := map[netutils.LocalPort]netutils.Closeable{}
// We are creating those slices ones here to avoid memory reallocations // We are creating those slices ones here to avoid memory reallocations
// in every loop. Note that reuse the memory, instead of doing: // in every loop. Note that reuse the memory, instead of doing:
// slice = <some new slice> // slice = <some new slice>
@ -969,7 +962,6 @@ func (proxier *Proxier) syncProxyRules() {
proxier.endpointChainsNumber += len(proxier.endpointsMap[svcName]) proxier.endpointChainsNumber += len(proxier.endpointsMap[svcName])
} }
localAddrSet := utilproxy.GetLocalAddrSet()
nodeAddresses, err := utilproxy.GetNodeAddresses(proxier.nodePortAddresses, proxier.networkInterfacer) nodeAddresses, err := utilproxy.GetNodeAddresses(proxier.nodePortAddresses, proxier.networkInterfacer)
if err != nil { if err != nil {
klog.ErrorS(err, "Failed to get node ip address matching nodeport cidrs, services with nodeport may not work as intended", "CIDRs", proxier.nodePortAddresses) klog.ErrorS(err, "Failed to get node ip address matching nodeport cidrs, services with nodeport may not work as intended", "CIDRs", proxier.nodePortAddresses)
@ -993,10 +985,6 @@ func (proxier *Proxier) syncProxyRules() {
continue continue
} }
isIPv6 := netutils.IsIPv6(svcInfo.ClusterIP()) isIPv6 := netutils.IsIPv6(svcInfo.ClusterIP())
localPortIPFamily := netutils.IPv4
if isIPv6 {
localPortIPFamily = netutils.IPv6
}
protocol := strings.ToLower(string(svcInfo.Protocol())) protocol := strings.ToLower(string(svcInfo.Protocol()))
svcNameString := svcInfo.serviceNameString svcNameString := svcInfo.serviceNameString
@ -1149,20 +1137,6 @@ func (proxier *Proxier) syncProxyRules() {
// Capture externalIPs. // Capture externalIPs.
for _, externalIP := range svcInfo.ExternalIPStrings() { for _, externalIP := range svcInfo.ExternalIPStrings() {
// If the "external" IP happens to be an IP that is local to this
// machine, hold the local port open so no other process can open it
// (because the socket might open but it would never work).
if (svcInfo.Protocol() != v1.ProtocolSCTP) && localAddrSet.Has(netutils.ParseIPSloppy(externalIP)) {
lp := netutils.LocalPort{
Description: "externalIP for " + svcNameString,
IP: externalIP,
IPFamily: localPortIPFamily,
Port: svcInfo.Port(),
Protocol: netutils.Protocol(svcInfo.Protocol()),
}
proxier.openPort(lp, replacementPortsMap)
}
if hasEndpoints { if hasEndpoints {
args = append(args[:0], args = append(args[:0],
"-m", "comment", "--comment", fmt.Sprintf(`"%s external IP"`, svcNameString), "-m", "comment", "--comment", fmt.Sprintf(`"%s external IP"`, svcNameString),
@ -1295,23 +1269,6 @@ func (proxier *Proxier) syncProxyRules() {
// worthwhile to make a new per-service chain for nodeport rules, but // worthwhile to make a new per-service chain for nodeport rules, but
// with just 2 rules it ends up being a waste and a cognitive burden. // with just 2 rules it ends up being a waste and a cognitive burden.
if svcInfo.NodePort() != 0 && len(nodeAddresses) != 0 { if svcInfo.NodePort() != 0 && len(nodeAddresses) != 0 {
// Hold the local port open so no other process can open it
// (because the socket might open but it would never work).
// nodeAddresses only contains the addresses for this proxier's IP family.
for address := range nodeAddresses {
if utilproxy.IsZeroCIDR(address) {
address = ""
}
lp := netutils.LocalPort{
Description: "nodePort for " + svcNameString,
IP: address,
IPFamily: localPortIPFamily,
Port: svcInfo.NodePort(),
Protocol: netutils.Protocol(svcInfo.Protocol()),
}
proxier.openPort(lp, replacementPortsMap)
}
if hasEndpoints { if hasEndpoints {
args = append(args[:0], args = append(args[:0],
@ -1537,9 +1494,6 @@ func (proxier *Proxier) syncProxyRules() {
klog.ErrorS(err, "Failed to execute iptables-restore") klog.ErrorS(err, "Failed to execute iptables-restore")
} }
metrics.IptablesRestoreFailuresTotal.Inc() metrics.IptablesRestoreFailuresTotal.Inc()
// Revert new local ports.
klog.V(2).InfoS("Closing local ports after iptables-restore failure")
utilproxy.RevertPorts(replacementPortsMap, proxier.portsMap)
return return
} }
success = true success = true
@ -1552,14 +1506,6 @@ func (proxier *Proxier) syncProxyRules() {
} }
} }
// Close old local ports and save new ones.
for k, v := range proxier.portsMap {
if replacementPortsMap[k] == nil {
v.Close()
}
}
proxier.portsMap = replacementPortsMap
if proxier.healthzServer != nil { if proxier.healthzServer != nil {
proxier.healthzServer.Updated() proxier.healthzServer.Updated()
} }
@ -1595,36 +1541,6 @@ func (proxier *Proxier) syncProxyRules() {
proxier.deleteEndpointConnections(endpointUpdateResult.StaleEndpoints) proxier.deleteEndpointConnections(endpointUpdateResult.StaleEndpoints)
} }
func (proxier *Proxier) openPort(lp netutils.LocalPort, replacementPortsMap map[netutils.LocalPort]netutils.Closeable) {
// We don't open ports for SCTP services
if lp.Protocol == netutils.Protocol(v1.ProtocolSCTP) {
return
}
if proxier.portsMap[lp] != nil {
klog.V(4).InfoS("Port was open before and is still needed", "port", lp)
replacementPortsMap[lp] = proxier.portsMap[lp]
return
}
socket, err := proxier.portMapper.OpenLocalPort(&lp)
if err != nil {
msg := fmt.Sprintf("can't open port %s, skipping it", lp.String())
proxier.recorder.Eventf(
&v1.ObjectReference{
Kind: "Node",
Name: proxier.hostname,
UID: types.UID(proxier.hostname),
Namespace: "",
}, nil, v1.EventTypeWarning, err.Error(), "SyncProxyRules", msg)
klog.ErrorS(err, "can't open port, skipping it", "port", lp)
return
}
klog.V(2).InfoS("Opened local port", "port", lp)
replacementPortsMap[lp] = socket
}
func (proxier *Proxier) writeServiceToEndpointRules(svcNameString string, svcInfo proxy.ServicePort, svcChain utiliptables.Chain, endpointChains []utiliptables.Chain, args []string) { func (proxier *Proxier) writeServiceToEndpointRules(svcNameString string, svcInfo proxy.ServicePort, svcChain utiliptables.Chain, endpointChains []utiliptables.Chain, args []string) {
// First write session affinity rules, if applicable. // First write session affinity rules, if applicable.
if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP { if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {

View File

@ -361,26 +361,6 @@ func TestDeleteEndpointConnectionsIPv6(t *testing.T) {
} }
} }
// fakeCloseable implements utilproxy.Closeable
type fakeCloseable struct{}
// Close fakes out the close() used by syncProxyRules to release a local port.
func (f *fakeCloseable) Close() error {
return nil
}
// fakePortOpener implements portOpener.
type fakePortOpener struct {
openPorts []*netutils.LocalPort
}
// OpenLocalPort fakes out the listen() and bind() used by syncProxyRules
// to lock a local port.
func (f *fakePortOpener) OpenLocalPort(lp *netutils.LocalPort) (netutils.Closeable, error) {
f.openPorts = append(f.openPorts, lp)
return &fakeCloseable{}, nil
}
// Conventions for tests using NewFakeProxier: // Conventions for tests using NewFakeProxier:
// //
// Pod IPs: 10.0.0.0/8 // Pod IPs: 10.0.0.0/8
@ -427,8 +407,6 @@ func NewFakeProxier(ipt utiliptables.Interface) *Proxier {
masqueradeMark: "0x4000", masqueradeMark: "0x4000",
localDetector: detectLocal, localDetector: detectLocal,
hostname: testHostname, hostname: testHostname,
portsMap: make(map[netutils.LocalPort]netutils.Closeable),
portMapper: &fakePortOpener{[]*netutils.LocalPort{}},
serviceHealthServer: healthcheck.NewFakeServiceHealthServer(), serviceHealthServer: healthcheck.NewFakeServiceHealthServer(),
precomputedProbabilities: make([]string, 0, 1001), precomputedProbabilities: make([]string, 0, 1001),
iptablesData: bytes.NewBuffer(nil), iptablesData: bytes.NewBuffer(nil),
@ -1902,15 +1880,6 @@ COMMIT
-A KUBE-SERVICES -m comment --comment "kubernetes service nodeports; NOTE: this must be the last rule in this chain" -m addrtype --dst-type LOCAL -j KUBE-NODEPORTS -A KUBE-SERVICES -m comment --comment "kubernetes service nodeports; NOTE: this must be the last rule in this chain" -m addrtype --dst-type LOCAL -j KUBE-NODEPORTS
COMMIT COMMIT
` `
assert.Equal(t, []*netutils.LocalPort{
{
Description: "nodePort for ns1/svc1:p80",
IP: "",
IPFamily: netutils.IPv4,
Port: svcNodePort,
Protocol: netutils.TCP,
},
}, fp.portMapper.(*fakePortOpener).openPorts)
assertIPTablesRulesEqual(t, expected, fp.iptablesData.String()) assertIPTablesRulesEqual(t, expected, fp.iptablesData.String())
} }

View File

@ -222,7 +222,6 @@ type Proxier struct {
mu sync.Mutex // protects the following fields mu sync.Mutex // protects the following fields
serviceMap proxy.ServiceMap serviceMap proxy.ServiceMap
endpointsMap proxy.EndpointsMap endpointsMap proxy.EndpointsMap
portsMap map[netutils.LocalPort]netutils.Closeable
nodeLabels map[string]string nodeLabels map[string]string
// endpointSlicesSynced, and servicesSynced are set to true when // endpointSlicesSynced, and servicesSynced are set to true when
// corresponding objects are synced after startup. This is used to avoid updating // corresponding objects are synced after startup. This is used to avoid updating
@ -248,7 +247,6 @@ type Proxier struct {
localDetector proxyutiliptables.LocalTrafficDetector localDetector proxyutiliptables.LocalTrafficDetector
hostname string hostname string
nodeIP net.IP nodeIP net.IP
portMapper netutils.PortOpener
recorder events.EventRecorder recorder events.EventRecorder
serviceHealthServer healthcheck.ServiceHealthServer serviceHealthServer healthcheck.ServiceHealthServer
@ -453,7 +451,6 @@ func NewProxier(ipt utiliptables.Interface,
proxier := &Proxier{ proxier := &Proxier{
ipFamily: ipFamily, ipFamily: ipFamily,
portsMap: make(map[netutils.LocalPort]netutils.Closeable),
serviceMap: make(proxy.ServiceMap), serviceMap: make(proxy.ServiceMap),
serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, ipFamily, recorder, nil), serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, ipFamily, recorder, nil),
endpointsMap: make(proxy.EndpointsMap), endpointsMap: make(proxy.EndpointsMap),
@ -468,7 +465,6 @@ func NewProxier(ipt utiliptables.Interface,
localDetector: localDetector, localDetector: localDetector,
hostname: hostname, hostname: hostname,
nodeIP: nodeIP, nodeIP: nodeIP,
portMapper: &netutils.ListenPortOpener,
recorder: recorder, recorder: recorder,
serviceHealthServer: serviceHealthServer, serviceHealthServer: serviceHealthServer,
healthzServer: healthzServer, healthzServer: healthzServer,
@ -1061,8 +1057,6 @@ func (proxier *Proxier) syncProxyRules() {
set.resetEntries() set.resetEntries()
} }
// Accumulate the set of local ports that we will be holding open once this update is complete
replacementPortsMap := map[netutils.LocalPort]netutils.Closeable{}
// activeIPVSServices represents IPVS service successfully created in this round of sync // activeIPVSServices represents IPVS service successfully created in this round of sync
activeIPVSServices := map[string]bool{} activeIPVSServices := map[string]bool{}
// currentIPVSServices represent IPVS services listed from the system // currentIPVSServices represent IPVS services listed from the system
@ -1127,8 +1121,6 @@ func (proxier *Proxier) syncProxyRules() {
// reset slice to filtered entries // reset slice to filtered entries
nodeIPs = nodeIPs[:idx] nodeIPs = nodeIPs[:idx]
localAddrSet := utilproxy.GetLocalAddrSet()
// Build IPVS rules for each service. // Build IPVS rules for each service.
for svcName, svc := range proxier.serviceMap { for svcName, svc := range proxier.serviceMap {
svcInfo, ok := svc.(*serviceInfo) svcInfo, ok := svc.(*serviceInfo)
@ -1222,41 +1214,6 @@ func (proxier *Proxier) syncProxyRules() {
// Capture externalIPs. // Capture externalIPs.
for _, externalIP := range svcInfo.ExternalIPStrings() { for _, externalIP := range svcInfo.ExternalIPStrings() {
// If the "external" IP happens to be an IP that is local to this
// machine, hold the local port open so no other process can open it
// (because the socket might open but it would never work).
if (svcInfo.Protocol() != v1.ProtocolSCTP) && localAddrSet.Has(netutils.ParseIPSloppy(externalIP)) {
// We do not start listening on SCTP ports, according to our agreement in the SCTP support KEP
lp := netutils.LocalPort{
Description: "externalIP for " + svcNameString,
IP: externalIP,
IPFamily: localPortIPFamily,
Port: svcInfo.Port(),
Protocol: netutils.Protocol(svcInfo.Protocol()),
}
if proxier.portsMap[lp] != nil {
klog.V(4).InfoS("Port was open before and is still needed", "port", lp)
replacementPortsMap[lp] = proxier.portsMap[lp]
} else {
socket, err := proxier.portMapper.OpenLocalPort(&lp)
if err != nil {
msg := fmt.Sprintf("can't open port %s, skipping it", lp.String())
proxier.recorder.Eventf(
&v1.ObjectReference{
Kind: "Node",
Name: proxier.hostname,
UID: types.UID(proxier.hostname),
Namespace: "",
}, nil, v1.EventTypeWarning, err.Error(), "SyncProxyRules", msg)
klog.ErrorS(err, "Can't open port, skipping it", "port", lp)
continue
}
klog.V(2).InfoS("Opened local port", "port", lp)
replacementPortsMap[lp] = socket
}
} // We're holding the port, so it's OK to install IPVS rules.
// ipset call // ipset call
entry := &utilipset.Entry{ entry := &utilipset.Entry{
IP: externalIP, IP: externalIP,
@ -1430,33 +1387,9 @@ func (proxier *Proxier) syncProxyRules() {
// For ports on node IPs, open the actual port and hold it. // For ports on node IPs, open the actual port and hold it.
for _, lp := range lps { for _, lp := range lps {
if proxier.portsMap[lp] != nil { if svcInfo.Protocol() != v1.ProtocolSCTP && lp.Protocol == netutils.UDP {
klog.V(4).InfoS("Port was open before and is still needed", "port", lp)
replacementPortsMap[lp] = proxier.portsMap[lp]
// We do not start listening on SCTP ports, according to our agreement in the
// SCTP support KEP
} else if svcInfo.Protocol() != v1.ProtocolSCTP {
socket, err := proxier.portMapper.OpenLocalPort(&lp)
if err != nil {
msg := fmt.Sprintf("can't open port %s, skipping it", lp.String())
proxier.recorder.Eventf(
&v1.ObjectReference{
Kind: "Node",
Name: proxier.hostname,
UID: types.UID(proxier.hostname),
Namespace: "",
}, nil, v1.EventTypeWarning, err.Error(), "SyncProxyRules", msg)
klog.ErrorS(err, "Can't open port, skipping it", "port", lp)
continue
}
klog.V(2).InfoS("Opened local port", "port", lp)
if lp.Protocol == netutils.UDP {
conntrack.ClearEntriesForPort(proxier.exec, lp.Port, isIPv6, v1.ProtocolUDP) conntrack.ClearEntriesForPort(proxier.exec, lp.Port, isIPv6, v1.ProtocolUDP)
} }
replacementPortsMap[lp] = socket
} // We're holding the port, so it's OK to install ipvs rules.
} }
// Nodeports need SNAT, unless they're local. // Nodeports need SNAT, unless they're local.
@ -1614,8 +1547,6 @@ func (proxier *Proxier) syncProxyRules() {
klog.ErrorS(err, "Failed to execute iptables-restore", "rules", string(proxier.iptablesData.Bytes())) klog.ErrorS(err, "Failed to execute iptables-restore", "rules", string(proxier.iptablesData.Bytes()))
} }
metrics.IptablesRestoreFailuresTotal.Inc() metrics.IptablesRestoreFailuresTotal.Inc()
// Revert new local ports.
utilproxy.RevertPorts(replacementPortsMap, proxier.portsMap)
return return
} }
for name, lastChangeTriggerTimes := range endpointUpdateResult.LastChangeTriggerTimes { for name, lastChangeTriggerTimes := range endpointUpdateResult.LastChangeTriggerTimes {
@ -1626,14 +1557,6 @@ func (proxier *Proxier) syncProxyRules() {
} }
} }
// Close old local ports and save new ones.
for k, v := range proxier.portsMap {
if replacementPortsMap[k] == nil {
v.Close()
}
}
proxier.portsMap = replacementPortsMap
// Get legacy bind address // Get legacy bind address
// currentBindAddrs represents ip addresses bind to DefaultDummyDevice from the system // currentBindAddrs represents ip addresses bind to DefaultDummyDevice from the system
currentBindAddrs, err := proxier.netlinkHandle.ListBindAddress(DefaultDummyDevice) currentBindAddrs, err := proxier.netlinkHandle.ListBindAddress(DefaultDummyDevice)

View File

@ -70,18 +70,6 @@ func (f *fakeIPGetter) BindedIPs() (sets.String, error) {
return f.bindedIPs, nil return f.bindedIPs, nil
} }
// fakePortOpener implements portOpener.
type fakePortOpener struct {
openPorts []*netutils.LocalPort
}
// OpenLocalPort fakes out the listen() and bind() used by syncProxyRules
// to lock a local port.
func (f *fakePortOpener) OpenLocalPort(lp *netutils.LocalPort) (netutils.Closeable, error) {
f.openPorts = append(f.openPorts, lp)
return nil, nil
}
// fakeKernelHandler implements KernelHandler. // fakeKernelHandler implements KernelHandler.
type fakeKernelHandler struct { type fakeKernelHandler struct {
modules []string modules []string
@ -153,8 +141,6 @@ func NewFakeProxier(ipt utiliptables.Interface, ipvs utilipvs.Interface, ipset u
strictARP: false, strictARP: false,
localDetector: proxyutiliptables.NewNoOpLocalDetector(), localDetector: proxyutiliptables.NewNoOpLocalDetector(),
hostname: testHostname, hostname: testHostname,
portsMap: make(map[netutils.LocalPort]netutils.Closeable),
portMapper: &fakePortOpener{[]*netutils.LocalPort{}},
serviceHealthServer: healthcheck.NewFakeServiceHealthServer(), serviceHealthServer: healthcheck.NewFakeServiceHealthServer(),
ipvsScheduler: DefaultScheduler, ipvsScheduler: DefaultScheduler,
ipGetter: &fakeIPGetter{nodeIPs: nodeIPs}, ipGetter: &fakeIPGetter{nodeIPs: nodeIPs},