diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index 61c705cfa8b..ba2f1fa4a01 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -190,7 +190,6 @@ type Proxier struct { mu sync.Mutex // protects the following fields serviceMap proxy.ServiceMap endpointsMap proxy.EndpointsMap - portsMap map[netutils.LocalPort]netutils.Closeable nodeLabels map[string]string // endpointSlicesSynced, and servicesSynced are set to true // when corresponding objects are synced after startup. This is used to avoid @@ -209,7 +208,6 @@ type Proxier struct { localDetector proxyutiliptables.LocalTrafficDetector hostname string nodeIP net.IP - portMapper netutils.PortOpener recorder events.EventRecorder serviceHealthServer healthcheck.ServiceHealthServer @@ -299,7 +297,6 @@ func NewProxier(ipt utiliptables.Interface, } proxier := &Proxier{ - portsMap: make(map[netutils.LocalPort]netutils.Closeable), serviceMap: make(proxy.ServiceMap), serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, ipFamily, recorder, nil), endpointsMap: make(proxy.EndpointsMap), @@ -312,7 +309,6 @@ func NewProxier(ipt utiliptables.Interface, localDetector: localDetector, hostname: hostname, nodeIP: nodeIP, - portMapper: &netutils.ListenPortOpener, recorder: recorder, serviceHealthServer: serviceHealthServer, healthzServer: healthzServer, @@ -945,9 +941,6 @@ func (proxier *Proxier) syncProxyRules() { // Accumulate NAT chains to keep. activeNATChains := map[utiliptables.Chain]bool{} // use a map as a set - // Accumulate the set of local ports that we will be holding open once this update is complete - replacementPortsMap := map[netutils.LocalPort]netutils.Closeable{} - // We are creating those slices ones here to avoid memory reallocations // in every loop. Note that reuse the memory, instead of doing: // slice = @@ -969,7 +962,6 @@ func (proxier *Proxier) syncProxyRules() { proxier.endpointChainsNumber += len(proxier.endpointsMap[svcName]) } - localAddrSet := utilproxy.GetLocalAddrSet() nodeAddresses, err := utilproxy.GetNodeAddresses(proxier.nodePortAddresses, proxier.networkInterfacer) if err != nil { klog.ErrorS(err, "Failed to get node ip address matching nodeport cidrs, services with nodeport may not work as intended", "CIDRs", proxier.nodePortAddresses) @@ -993,10 +985,6 @@ func (proxier *Proxier) syncProxyRules() { continue } isIPv6 := netutils.IsIPv6(svcInfo.ClusterIP()) - localPortIPFamily := netutils.IPv4 - if isIPv6 { - localPortIPFamily = netutils.IPv6 - } protocol := strings.ToLower(string(svcInfo.Protocol())) svcNameString := svcInfo.serviceNameString @@ -1149,20 +1137,6 @@ func (proxier *Proxier) syncProxyRules() { // Capture externalIPs. for _, externalIP := range svcInfo.ExternalIPStrings() { - // If the "external" IP happens to be an IP that is local to this - // machine, hold the local port open so no other process can open it - // (because the socket might open but it would never work). - if (svcInfo.Protocol() != v1.ProtocolSCTP) && localAddrSet.Has(netutils.ParseIPSloppy(externalIP)) { - lp := netutils.LocalPort{ - Description: "externalIP for " + svcNameString, - IP: externalIP, - IPFamily: localPortIPFamily, - Port: svcInfo.Port(), - Protocol: netutils.Protocol(svcInfo.Protocol()), - } - proxier.openPort(lp, replacementPortsMap) - } - if hasEndpoints { args = append(args[:0], "-m", "comment", "--comment", fmt.Sprintf(`"%s external IP"`, svcNameString), @@ -1295,23 +1269,6 @@ func (proxier *Proxier) syncProxyRules() { // worthwhile to make a new per-service chain for nodeport rules, but // with just 2 rules it ends up being a waste and a cognitive burden. if svcInfo.NodePort() != 0 && len(nodeAddresses) != 0 { - // Hold the local port open so no other process can open it - // (because the socket might open but it would never work). - - // nodeAddresses only contains the addresses for this proxier's IP family. - for address := range nodeAddresses { - if utilproxy.IsZeroCIDR(address) { - address = "" - } - lp := netutils.LocalPort{ - Description: "nodePort for " + svcNameString, - IP: address, - IPFamily: localPortIPFamily, - Port: svcInfo.NodePort(), - Protocol: netutils.Protocol(svcInfo.Protocol()), - } - proxier.openPort(lp, replacementPortsMap) - } if hasEndpoints { args = append(args[:0], @@ -1537,9 +1494,6 @@ func (proxier *Proxier) syncProxyRules() { klog.ErrorS(err, "Failed to execute iptables-restore") } metrics.IptablesRestoreFailuresTotal.Inc() - // Revert new local ports. - klog.V(2).InfoS("Closing local ports after iptables-restore failure") - utilproxy.RevertPorts(replacementPortsMap, proxier.portsMap) return } success = true @@ -1552,14 +1506,6 @@ func (proxier *Proxier) syncProxyRules() { } } - // Close old local ports and save new ones. - for k, v := range proxier.portsMap { - if replacementPortsMap[k] == nil { - v.Close() - } - } - proxier.portsMap = replacementPortsMap - if proxier.healthzServer != nil { proxier.healthzServer.Updated() } @@ -1595,36 +1541,6 @@ func (proxier *Proxier) syncProxyRules() { proxier.deleteEndpointConnections(endpointUpdateResult.StaleEndpoints) } -func (proxier *Proxier) openPort(lp netutils.LocalPort, replacementPortsMap map[netutils.LocalPort]netutils.Closeable) { - // We don't open ports for SCTP services - if lp.Protocol == netutils.Protocol(v1.ProtocolSCTP) { - return - } - - if proxier.portsMap[lp] != nil { - klog.V(4).InfoS("Port was open before and is still needed", "port", lp) - replacementPortsMap[lp] = proxier.portsMap[lp] - return - } - - socket, err := proxier.portMapper.OpenLocalPort(&lp) - if err != nil { - msg := fmt.Sprintf("can't open port %s, skipping it", lp.String()) - proxier.recorder.Eventf( - &v1.ObjectReference{ - Kind: "Node", - Name: proxier.hostname, - UID: types.UID(proxier.hostname), - Namespace: "", - }, nil, v1.EventTypeWarning, err.Error(), "SyncProxyRules", msg) - klog.ErrorS(err, "can't open port, skipping it", "port", lp) - return - } - - klog.V(2).InfoS("Opened local port", "port", lp) - replacementPortsMap[lp] = socket -} - func (proxier *Proxier) writeServiceToEndpointRules(svcNameString string, svcInfo proxy.ServicePort, svcChain utiliptables.Chain, endpointChains []utiliptables.Chain, args []string) { // First write session affinity rules, if applicable. if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP { diff --git a/pkg/proxy/iptables/proxier_test.go b/pkg/proxy/iptables/proxier_test.go index 9ca959773f1..441e4416ed7 100644 --- a/pkg/proxy/iptables/proxier_test.go +++ b/pkg/proxy/iptables/proxier_test.go @@ -361,26 +361,6 @@ func TestDeleteEndpointConnectionsIPv6(t *testing.T) { } } -// fakeCloseable implements utilproxy.Closeable -type fakeCloseable struct{} - -// Close fakes out the close() used by syncProxyRules to release a local port. -func (f *fakeCloseable) Close() error { - return nil -} - -// fakePortOpener implements portOpener. -type fakePortOpener struct { - openPorts []*netutils.LocalPort -} - -// OpenLocalPort fakes out the listen() and bind() used by syncProxyRules -// to lock a local port. -func (f *fakePortOpener) OpenLocalPort(lp *netutils.LocalPort) (netutils.Closeable, error) { - f.openPorts = append(f.openPorts, lp) - return &fakeCloseable{}, nil -} - // Conventions for tests using NewFakeProxier: // // Pod IPs: 10.0.0.0/8 @@ -427,8 +407,6 @@ func NewFakeProxier(ipt utiliptables.Interface) *Proxier { masqueradeMark: "0x4000", localDetector: detectLocal, hostname: testHostname, - portsMap: make(map[netutils.LocalPort]netutils.Closeable), - portMapper: &fakePortOpener{[]*netutils.LocalPort{}}, serviceHealthServer: healthcheck.NewFakeServiceHealthServer(), precomputedProbabilities: make([]string, 0, 1001), iptablesData: bytes.NewBuffer(nil), @@ -1902,15 +1880,6 @@ COMMIT -A KUBE-SERVICES -m comment --comment "kubernetes service nodeports; NOTE: this must be the last rule in this chain" -m addrtype --dst-type LOCAL -j KUBE-NODEPORTS COMMIT ` - assert.Equal(t, []*netutils.LocalPort{ - { - Description: "nodePort for ns1/svc1:p80", - IP: "", - IPFamily: netutils.IPv4, - Port: svcNodePort, - Protocol: netutils.TCP, - }, - }, fp.portMapper.(*fakePortOpener).openPorts) assertIPTablesRulesEqual(t, expected, fp.iptablesData.String()) } diff --git a/pkg/proxy/ipvs/proxier.go b/pkg/proxy/ipvs/proxier.go index 8eb64b1de7d..ba7a86d3d39 100644 --- a/pkg/proxy/ipvs/proxier.go +++ b/pkg/proxy/ipvs/proxier.go @@ -222,7 +222,6 @@ type Proxier struct { mu sync.Mutex // protects the following fields serviceMap proxy.ServiceMap endpointsMap proxy.EndpointsMap - portsMap map[netutils.LocalPort]netutils.Closeable nodeLabels map[string]string // endpointSlicesSynced, and servicesSynced are set to true when // corresponding objects are synced after startup. This is used to avoid updating @@ -248,7 +247,6 @@ type Proxier struct { localDetector proxyutiliptables.LocalTrafficDetector hostname string nodeIP net.IP - portMapper netutils.PortOpener recorder events.EventRecorder serviceHealthServer healthcheck.ServiceHealthServer @@ -453,7 +451,6 @@ func NewProxier(ipt utiliptables.Interface, proxier := &Proxier{ ipFamily: ipFamily, - portsMap: make(map[netutils.LocalPort]netutils.Closeable), serviceMap: make(proxy.ServiceMap), serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, ipFamily, recorder, nil), endpointsMap: make(proxy.EndpointsMap), @@ -468,7 +465,6 @@ func NewProxier(ipt utiliptables.Interface, localDetector: localDetector, hostname: hostname, nodeIP: nodeIP, - portMapper: &netutils.ListenPortOpener, recorder: recorder, serviceHealthServer: serviceHealthServer, healthzServer: healthzServer, @@ -1061,8 +1057,6 @@ func (proxier *Proxier) syncProxyRules() { set.resetEntries() } - // Accumulate the set of local ports that we will be holding open once this update is complete - replacementPortsMap := map[netutils.LocalPort]netutils.Closeable{} // activeIPVSServices represents IPVS service successfully created in this round of sync activeIPVSServices := map[string]bool{} // currentIPVSServices represent IPVS services listed from the system @@ -1127,8 +1121,6 @@ func (proxier *Proxier) syncProxyRules() { // reset slice to filtered entries nodeIPs = nodeIPs[:idx] - localAddrSet := utilproxy.GetLocalAddrSet() - // Build IPVS rules for each service. for svcName, svc := range proxier.serviceMap { svcInfo, ok := svc.(*serviceInfo) @@ -1222,41 +1214,6 @@ func (proxier *Proxier) syncProxyRules() { // Capture externalIPs. for _, externalIP := range svcInfo.ExternalIPStrings() { - // If the "external" IP happens to be an IP that is local to this - // machine, hold the local port open so no other process can open it - // (because the socket might open but it would never work). - if (svcInfo.Protocol() != v1.ProtocolSCTP) && localAddrSet.Has(netutils.ParseIPSloppy(externalIP)) { - // We do not start listening on SCTP ports, according to our agreement in the SCTP support KEP - lp := netutils.LocalPort{ - Description: "externalIP for " + svcNameString, - IP: externalIP, - IPFamily: localPortIPFamily, - Port: svcInfo.Port(), - Protocol: netutils.Protocol(svcInfo.Protocol()), - } - if proxier.portsMap[lp] != nil { - klog.V(4).InfoS("Port was open before and is still needed", "port", lp) - replacementPortsMap[lp] = proxier.portsMap[lp] - } else { - socket, err := proxier.portMapper.OpenLocalPort(&lp) - if err != nil { - msg := fmt.Sprintf("can't open port %s, skipping it", lp.String()) - - proxier.recorder.Eventf( - &v1.ObjectReference{ - Kind: "Node", - Name: proxier.hostname, - UID: types.UID(proxier.hostname), - Namespace: "", - }, nil, v1.EventTypeWarning, err.Error(), "SyncProxyRules", msg) - klog.ErrorS(err, "Can't open port, skipping it", "port", lp) - continue - } - klog.V(2).InfoS("Opened local port", "port", lp) - replacementPortsMap[lp] = socket - } - } // We're holding the port, so it's OK to install IPVS rules. - // ipset call entry := &utilipset.Entry{ IP: externalIP, @@ -1430,33 +1387,9 @@ func (proxier *Proxier) syncProxyRules() { // For ports on node IPs, open the actual port and hold it. for _, lp := range lps { - if proxier.portsMap[lp] != nil { - klog.V(4).InfoS("Port was open before and is still needed", "port", lp) - replacementPortsMap[lp] = proxier.portsMap[lp] - // We do not start listening on SCTP ports, according to our agreement in the - // SCTP support KEP - } else if svcInfo.Protocol() != v1.ProtocolSCTP { - socket, err := proxier.portMapper.OpenLocalPort(&lp) - if err != nil { - msg := fmt.Sprintf("can't open port %s, skipping it", lp.String()) - - proxier.recorder.Eventf( - &v1.ObjectReference{ - Kind: "Node", - Name: proxier.hostname, - UID: types.UID(proxier.hostname), - Namespace: "", - }, nil, v1.EventTypeWarning, err.Error(), "SyncProxyRules", msg) - klog.ErrorS(err, "Can't open port, skipping it", "port", lp) - continue - } - klog.V(2).InfoS("Opened local port", "port", lp) - - if lp.Protocol == netutils.UDP { - conntrack.ClearEntriesForPort(proxier.exec, lp.Port, isIPv6, v1.ProtocolUDP) - } - replacementPortsMap[lp] = socket - } // We're holding the port, so it's OK to install ipvs rules. + if svcInfo.Protocol() != v1.ProtocolSCTP && lp.Protocol == netutils.UDP { + conntrack.ClearEntriesForPort(proxier.exec, lp.Port, isIPv6, v1.ProtocolUDP) + } } // Nodeports need SNAT, unless they're local. @@ -1614,8 +1547,6 @@ func (proxier *Proxier) syncProxyRules() { klog.ErrorS(err, "Failed to execute iptables-restore", "rules", string(proxier.iptablesData.Bytes())) } metrics.IptablesRestoreFailuresTotal.Inc() - // Revert new local ports. - utilproxy.RevertPorts(replacementPortsMap, proxier.portsMap) return } for name, lastChangeTriggerTimes := range endpointUpdateResult.LastChangeTriggerTimes { @@ -1626,14 +1557,6 @@ func (proxier *Proxier) syncProxyRules() { } } - // Close old local ports and save new ones. - for k, v := range proxier.portsMap { - if replacementPortsMap[k] == nil { - v.Close() - } - } - proxier.portsMap = replacementPortsMap - // Get legacy bind address // currentBindAddrs represents ip addresses bind to DefaultDummyDevice from the system currentBindAddrs, err := proxier.netlinkHandle.ListBindAddress(DefaultDummyDevice) diff --git a/pkg/proxy/ipvs/proxier_test.go b/pkg/proxy/ipvs/proxier_test.go index b25431e3b2a..33d8a7e7e40 100644 --- a/pkg/proxy/ipvs/proxier_test.go +++ b/pkg/proxy/ipvs/proxier_test.go @@ -70,18 +70,6 @@ func (f *fakeIPGetter) BindedIPs() (sets.String, error) { return f.bindedIPs, nil } -// fakePortOpener implements portOpener. -type fakePortOpener struct { - openPorts []*netutils.LocalPort -} - -// OpenLocalPort fakes out the listen() and bind() used by syncProxyRules -// to lock a local port. -func (f *fakePortOpener) OpenLocalPort(lp *netutils.LocalPort) (netutils.Closeable, error) { - f.openPorts = append(f.openPorts, lp) - return nil, nil -} - // fakeKernelHandler implements KernelHandler. type fakeKernelHandler struct { modules []string @@ -153,8 +141,6 @@ func NewFakeProxier(ipt utiliptables.Interface, ipvs utilipvs.Interface, ipset u strictARP: false, localDetector: proxyutiliptables.NewNoOpLocalDetector(), hostname: testHostname, - portsMap: make(map[netutils.LocalPort]netutils.Closeable), - portMapper: &fakePortOpener{[]*netutils.LocalPort{}}, serviceHealthServer: healthcheck.NewFakeServiceHealthServer(), ipvsScheduler: DefaultScheduler, ipGetter: &fakeIPGetter{nodeIPs: nodeIPs},