Merge pull request #116171 from daman1807/conntrack-sync

Syncing IPVS conntrack cleaning with IPTables.
This commit is contained in:
Kubernetes Prow Robot 2023-03-03 06:18:57 -08:00 committed by GitHub
commit d446bebca8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -946,17 +946,26 @@ func (proxier *Proxier) syncProxyRules() {
serviceUpdateResult := proxier.svcPortMap.Update(proxier.serviceChanges)
endpointUpdateResult := proxier.endpointsMap.Update(proxier.endpointsChanges)
staleServices := serviceUpdateResult.UDPStaleClusterIP
// We need to detect stale connections to UDP Services so we
// can clean dangling conntrack entries that can blackhole traffic.
conntrackCleanupServiceIPs := serviceUpdateResult.UDPStaleClusterIP
conntrackCleanupServiceNodePorts := sets.NewInt()
// merge stale services gathered from updateEndpointsMap
// an UDP service that changes from 0 to non-0 endpoints is considered stale.
for _, svcPortName := range endpointUpdateResult.StaleServiceNames {
if svcInfo, ok := proxier.svcPortMap[svcPortName]; ok && svcInfo != nil && conntrack.IsClearConntrackNeeded(svcInfo.Protocol()) {
klog.V(2).InfoS("Stale service", "protocol", strings.ToLower(string(svcInfo.Protocol())), "servicePortName", svcPortName, "clusterIP", svcInfo.ClusterIP())
staleServices.Insert(svcInfo.ClusterIP().String())
conntrackCleanupServiceIPs.Insert(svcInfo.ClusterIP().String())
for _, extIP := range svcInfo.ExternalIPStrings() {
staleServices.Insert(extIP)
conntrackCleanupServiceIPs.Insert(extIP)
}
for _, extIP := range svcInfo.LoadBalancerIPStrings() {
staleServices.Insert(extIP)
for _, lbIP := range svcInfo.LoadBalancerIPStrings() {
conntrackCleanupServiceIPs.Insert(lbIP)
}
nodePort := svcInfo.NodePort()
if svcInfo.Protocol() == v1.ProtocolUDP && nodePort != 0 {
klog.V(2).InfoS("Stale service", "protocol", strings.ToLower(string(svcInfo.Protocol())), "servicePortName", svcPortName, "nodePort", nodePort)
conntrackCleanupServiceNodePorts.Insert(nodePort)
}
}
}
@ -1061,11 +1070,7 @@ func (proxier *Proxier) syncProxyRules() {
klog.ErrorS(nil, "Failed to cast serviceInfo", "servicePortName", svcPortName)
continue
}
isIPv6 := netutils.IsIPv6(svcInfo.ClusterIP())
localPortIPFamily := netutils.IPv4
if isIPv6 {
localPortIPFamily = netutils.IPv6
}
protocol := strings.ToLower(string(svcInfo.Protocol()))
// Precompute svcNameString; with many services the many calls
// to ServicePortName.String() show up in CPU profiles.
@ -1310,32 +1315,6 @@ func (proxier *Proxier) syncProxyRules() {
continue
}
var lps []netutils.LocalPort
for _, address := range nodeAddresses {
lp := netutils.LocalPort{
Description: "nodePort for " + svcPortNameString,
IP: address,
IPFamily: localPortIPFamily,
Port: svcInfo.NodePort(),
Protocol: netutils.Protocol(svcInfo.Protocol()),
}
if utilproxy.IsZeroCIDR(address) {
// Empty IP address means all
lp.IP = ""
lps = append(lps, lp)
// If we encounter a zero CIDR, then there is no point in processing the rest of the addresses.
break
}
lps = append(lps, lp)
}
// For ports on node IPs, open the actual port and hold it.
for _, lp := range lps {
if svcInfo.Protocol() != v1.ProtocolSCTP && lp.Protocol == netutils.UDP {
conntrack.ClearEntriesForPort(proxier.exec, lp.Port, isIPv6, v1.ProtocolUDP)
}
}
// Nodeports need SNAT, unless they're local.
// ipset call
@ -1547,17 +1526,27 @@ func (proxier *Proxier) syncProxyRules() {
klog.ErrorS(err, "Error syncing healthcheck endpoints")
}
// Finish housekeeping.
// TODO: these could be made more consistent.
for _, svcIP := range staleServices.UnsortedList() {
if err := conntrack.ClearEntriesForIP(proxier.exec, svcIP, v1.ProtocolUDP); err != nil {
klog.ErrorS(err, "Failed to delete stale service IP connections", "IP", svcIP)
}
}
proxier.deleteEndpointConnections(endpointUpdateResult.StaleEndpoints)
metrics.SyncProxyRulesNoLocalEndpointsTotal.WithLabelValues("internal").Set(float64(proxier.serviceNoLocalEndpointsInternal.Len()))
metrics.SyncProxyRulesNoLocalEndpointsTotal.WithLabelValues("external").Set(float64(proxier.serviceNoLocalEndpointsExternal.Len()))
// Finish housekeeping.
// Clear stale conntrack entries for UDP Services, this has to be done AFTER the ipvs rules are programmed.
// TODO: these could be made more consistent.
klog.V(4).InfoS("Deleting conntrack stale entries for services", "IPs", conntrackCleanupServiceIPs.UnsortedList())
for _, svcIP := range conntrackCleanupServiceIPs.UnsortedList() {
if err := conntrack.ClearEntriesForIP(proxier.exec, svcIP, v1.ProtocolUDP); err != nil {
klog.ErrorS(err, "Failed to delete stale service connections", "IP", svcIP)
}
}
klog.V(4).InfoS("Deleting conntrack stale entries for services", "nodePorts", conntrackCleanupServiceNodePorts.UnsortedList())
for _, nodePort := range conntrackCleanupServiceNodePorts.UnsortedList() {
err := conntrack.ClearEntriesForPort(proxier.exec, nodePort, proxier.ipFamily == v1.IPv6Protocol, v1.ProtocolUDP)
if err != nil {
klog.ErrorS(err, "Failed to clear udp conntrack", "nodePort", nodePort)
}
}
klog.V(4).InfoS("Deleting stale endpoint connections", "endpoints", endpointUpdateResult.StaleEndpoints)
proxier.deleteEndpointConnections(endpointUpdateResult.StaleEndpoints)
}
// writeIptablesRules write all iptables rules to proxier.natRules or proxier.FilterRules that ipvs proxier needed
@ -1825,27 +1814,36 @@ func (proxier *Proxier) createAndLinkKubeChain() {
}
// After a UDP or SCTP endpoint has been removed, we must flush any pending conntrack entries to it, or else we
// risk sending more traffic to it, all of which will be lost (because UDP).
// risk sending more traffic to it, all of which will be lost.
// This assumes the proxier mutex is held
// TODO: move it to util
func (proxier *Proxier) deleteEndpointConnections(connectionMap []proxy.ServiceEndpoint) {
for _, epSvcPair := range connectionMap {
if svcInfo, ok := proxier.svcPortMap[epSvcPair.ServicePortName]; ok && conntrack.IsClearConntrackNeeded(svcInfo.Protocol()) {
endpointIP := utilproxy.IPPart(epSvcPair.Endpoint)
nodePort := svcInfo.NodePort()
svcProto := svcInfo.Protocol()
err := conntrack.ClearEntriesForNAT(proxier.exec, svcInfo.ClusterIP().String(), endpointIP, svcProto)
var err error
if nodePort != 0 {
err = conntrack.ClearEntriesForPortNAT(proxier.exec, endpointIP, nodePort, svcProto)
if err != nil {
klog.ErrorS(err, "Failed to delete nodeport-related endpoint connections", "servicePortName", epSvcPair.ServicePortName)
}
}
err = conntrack.ClearEntriesForNAT(proxier.exec, svcInfo.ClusterIP().String(), endpointIP, svcProto)
if err != nil {
klog.ErrorS(err, "Failed to delete endpoint connections", "servicePortName", epSvcPair.ServicePortName)
}
for _, extIP := range svcInfo.ExternalIPStrings() {
err := conntrack.ClearEntriesForNAT(proxier.exec, extIP, endpointIP, svcProto)
if err != nil {
klog.ErrorS(err, "Failed to delete endpoint connections for externalIP", "servicePortName", epSvcPair.ServicePortName, "IP", extIP)
klog.ErrorS(err, "Failed to delete endpoint connections for externalIP", "servicePortName", epSvcPair.ServicePortName, "externalIP", extIP)
}
}
for _, lbIP := range svcInfo.LoadBalancerIPStrings() {
err := conntrack.ClearEntriesForNAT(proxier.exec, lbIP, endpointIP, svcProto)
if err != nil {
klog.ErrorS(err, "Failed to delete endpoint connections for LoadBalancerIP", "servicePortName", epSvcPair.ServicePortName, "IP", lbIP)
klog.ErrorS(err, "Failed to delete endpoint connections for LoadBalancerIP", "servicePortName", epSvcPair.ServicePortName, "loadBalancerIP", lbIP)
}
}
}