From b23cb9770497c81d40e5bf1eb2fe458cbd910c10 Mon Sep 17 00:00:00 2001 From: Daman Date: Thu, 2 Mar 2023 20:54:34 +0530 Subject: [PATCH 1/2] proxier: syncing ipvs conntrack cleaning with iptables. --- pkg/proxy/ipvs/proxier.go | 82 +++++++++++++++++++-------------------- 1 file changed, 40 insertions(+), 42 deletions(-) diff --git a/pkg/proxy/ipvs/proxier.go b/pkg/proxy/ipvs/proxier.go index 305d003a8d3..ad4b1b81f5a 100644 --- a/pkg/proxy/ipvs/proxier.go +++ b/pkg/proxy/ipvs/proxier.go @@ -946,17 +946,26 @@ func (proxier *Proxier) syncProxyRules() { serviceUpdateResult := proxier.svcPortMap.Update(proxier.serviceChanges) endpointUpdateResult := proxier.endpointsMap.Update(proxier.endpointsChanges) - staleServices := serviceUpdateResult.UDPStaleClusterIP + // We need to detect stale connections to UDP Services so we + // can clean dangling conntrack entries that can blackhole traffic. + conntrackCleanupServiceIPs := serviceUpdateResult.UDPStaleClusterIP + conntrackCleanupServiceNodePorts := sets.NewInt() // merge stale services gathered from updateEndpointsMap + // an UDP service that changes from 0 to non-0 endpoints is considered stale. for _, svcPortName := range endpointUpdateResult.StaleServiceNames { if svcInfo, ok := proxier.svcPortMap[svcPortName]; ok && svcInfo != nil && conntrack.IsClearConntrackNeeded(svcInfo.Protocol()) { klog.V(2).InfoS("Stale service", "protocol", strings.ToLower(string(svcInfo.Protocol())), "servicePortName", svcPortName, "clusterIP", svcInfo.ClusterIP()) - staleServices.Insert(svcInfo.ClusterIP().String()) + conntrackCleanupServiceIPs.Insert(svcInfo.ClusterIP().String()) for _, extIP := range svcInfo.ExternalIPStrings() { - staleServices.Insert(extIP) + conntrackCleanupServiceIPs.Insert(extIP) } - for _, extIP := range svcInfo.LoadBalancerIPStrings() { - staleServices.Insert(extIP) + for _, lbIP := range svcInfo.LoadBalancerIPStrings() { + conntrackCleanupServiceIPs.Insert(lbIP) + } + nodePort := svcInfo.NodePort() + if svcInfo.Protocol() == v1.ProtocolUDP && nodePort != 0 { + klog.V(2).InfoS("Stale service", "protocol", strings.ToLower(string(svcInfo.Protocol())), "servicePortName", svcPortName, "nodePort", nodePort) + conntrackCleanupServiceNodePorts.Insert(nodePort) } } } @@ -1061,11 +1070,7 @@ func (proxier *Proxier) syncProxyRules() { klog.ErrorS(nil, "Failed to cast serviceInfo", "servicePortName", svcPortName) continue } - isIPv6 := netutils.IsIPv6(svcInfo.ClusterIP()) - localPortIPFamily := netutils.IPv4 - if isIPv6 { - localPortIPFamily = netutils.IPv6 - } + protocol := strings.ToLower(string(svcInfo.Protocol())) // Precompute svcNameString; with many services the many calls // to ServicePortName.String() show up in CPU profiles. @@ -1310,32 +1315,6 @@ func (proxier *Proxier) syncProxyRules() { continue } - var lps []netutils.LocalPort - for _, address := range nodeAddresses { - lp := netutils.LocalPort{ - Description: "nodePort for " + svcPortNameString, - IP: address, - IPFamily: localPortIPFamily, - Port: svcInfo.NodePort(), - Protocol: netutils.Protocol(svcInfo.Protocol()), - } - if utilproxy.IsZeroCIDR(address) { - // Empty IP address means all - lp.IP = "" - lps = append(lps, lp) - // If we encounter a zero CIDR, then there is no point in processing the rest of the addresses. - break - } - lps = append(lps, lp) - } - - // For ports on node IPs, open the actual port and hold it. - for _, lp := range lps { - if svcInfo.Protocol() != v1.ProtocolSCTP && lp.Protocol == netutils.UDP { - conntrack.ClearEntriesForPort(proxier.exec, lp.Port, isIPv6, v1.ProtocolUDP) - } - } - // Nodeports need SNAT, unless they're local. // ipset call @@ -1548,12 +1527,22 @@ func (proxier *Proxier) syncProxyRules() { } // Finish housekeeping. + // Clear stale conntrack entries for UDP Services, this has to be done AFTER the ipvs rules are programmed. // TODO: these could be made more consistent. - for _, svcIP := range staleServices.UnsortedList() { + klog.V(4).InfoS("Deleting conntrack stale entries for services", "IPs", conntrackCleanupServiceIPs.UnsortedList()) + for _, svcIP := range conntrackCleanupServiceIPs.UnsortedList() { if err := conntrack.ClearEntriesForIP(proxier.exec, svcIP, v1.ProtocolUDP); err != nil { - klog.ErrorS(err, "Failed to delete stale service IP connections", "IP", svcIP) + klog.ErrorS(err, "Failed to delete stale service connections", "IP", svcIP) } } + klog.V(4).InfoS("Deleting conntrack stale entries for services", "nodePorts", conntrackCleanupServiceNodePorts.UnsortedList()) + for _, nodePort := range conntrackCleanupServiceNodePorts.UnsortedList() { + err := conntrack.ClearEntriesForPort(proxier.exec, nodePort, proxier.ipFamily == v1.IPv6Protocol, v1.ProtocolUDP) + if err != nil { + klog.ErrorS(err, "Failed to clear udp conntrack", "nodePort", nodePort) + } + } + klog.V(4).InfoS("Deleting stale endpoint connections", "endpoints", endpointUpdateResult.StaleEndpoints) proxier.deleteEndpointConnections(endpointUpdateResult.StaleEndpoints) metrics.SyncProxyRulesNoLocalEndpointsTotal.WithLabelValues("internal").Set(float64(proxier.serviceNoLocalEndpointsInternal.Len())) @@ -1825,27 +1814,36 @@ func (proxier *Proxier) createAndLinkKubeChain() { } // After a UDP or SCTP endpoint has been removed, we must flush any pending conntrack entries to it, or else we -// risk sending more traffic to it, all of which will be lost (because UDP). +// risk sending more traffic to it, all of which will be lost. // This assumes the proxier mutex is held +// TODO: move it to util func (proxier *Proxier) deleteEndpointConnections(connectionMap []proxy.ServiceEndpoint) { for _, epSvcPair := range connectionMap { if svcInfo, ok := proxier.svcPortMap[epSvcPair.ServicePortName]; ok && conntrack.IsClearConntrackNeeded(svcInfo.Protocol()) { endpointIP := utilproxy.IPPart(epSvcPair.Endpoint) + nodePort := svcInfo.NodePort() svcProto := svcInfo.Protocol() - err := conntrack.ClearEntriesForNAT(proxier.exec, svcInfo.ClusterIP().String(), endpointIP, svcProto) + var err error + if nodePort != 0 { + err = conntrack.ClearEntriesForPortNAT(proxier.exec, endpointIP, nodePort, svcProto) + if err != nil { + klog.ErrorS(err, "Failed to delete nodeport-related endpoint connections", "servicePortName", epSvcPair.ServicePortName) + } + } + err = conntrack.ClearEntriesForNAT(proxier.exec, svcInfo.ClusterIP().String(), endpointIP, svcProto) if err != nil { klog.ErrorS(err, "Failed to delete endpoint connections", "servicePortName", epSvcPair.ServicePortName) } for _, extIP := range svcInfo.ExternalIPStrings() { err := conntrack.ClearEntriesForNAT(proxier.exec, extIP, endpointIP, svcProto) if err != nil { - klog.ErrorS(err, "Failed to delete endpoint connections for externalIP", "servicePortName", epSvcPair.ServicePortName, "IP", extIP) + klog.ErrorS(err, "Failed to delete endpoint connections for externalIP", "servicePortName", epSvcPair.ServicePortName, "externalIP", extIP) } } for _, lbIP := range svcInfo.LoadBalancerIPStrings() { err := conntrack.ClearEntriesForNAT(proxier.exec, lbIP, endpointIP, svcProto) if err != nil { - klog.ErrorS(err, "Failed to delete endpoint connections for LoadBalancerIP", "servicePortName", epSvcPair.ServicePortName, "IP", lbIP) + klog.ErrorS(err, "Failed to delete endpoint connections for LoadBalancerIP", "servicePortName", epSvcPair.ServicePortName, "loadBalancerIP", lbIP) } } } From 42a91c29e5903c1f93cc9399635043a14bc99715 Mon Sep 17 00:00:00 2001 From: Daman Date: Thu, 2 Mar 2023 20:56:05 +0530 Subject: [PATCH 2/2] proxier: track metrics before conntrack cleaning --- pkg/proxy/ipvs/proxier.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/proxy/ipvs/proxier.go b/pkg/proxy/ipvs/proxier.go index ad4b1b81f5a..735554f0647 100644 --- a/pkg/proxy/ipvs/proxier.go +++ b/pkg/proxy/ipvs/proxier.go @@ -1526,6 +1526,9 @@ func (proxier *Proxier) syncProxyRules() { klog.ErrorS(err, "Error syncing healthcheck endpoints") } + metrics.SyncProxyRulesNoLocalEndpointsTotal.WithLabelValues("internal").Set(float64(proxier.serviceNoLocalEndpointsInternal.Len())) + metrics.SyncProxyRulesNoLocalEndpointsTotal.WithLabelValues("external").Set(float64(proxier.serviceNoLocalEndpointsExternal.Len())) + // Finish housekeeping. // Clear stale conntrack entries for UDP Services, this has to be done AFTER the ipvs rules are programmed. // TODO: these could be made more consistent. @@ -1544,9 +1547,6 @@ func (proxier *Proxier) syncProxyRules() { } klog.V(4).InfoS("Deleting stale endpoint connections", "endpoints", endpointUpdateResult.StaleEndpoints) proxier.deleteEndpointConnections(endpointUpdateResult.StaleEndpoints) - - metrics.SyncProxyRulesNoLocalEndpointsTotal.WithLabelValues("internal").Set(float64(proxier.serviceNoLocalEndpointsInternal.Len())) - metrics.SyncProxyRulesNoLocalEndpointsTotal.WithLabelValues("external").Set(float64(proxier.serviceNoLocalEndpointsExternal.Len())) } // writeIptablesRules write all iptables rules to proxier.natRules or proxier.FilterRules that ipvs proxier needed