diff --git a/pkg/proxy/conntrack/cleanup.go b/pkg/proxy/conntrack/cleanup.go index 6debead9a38..c39794c919b 100644 --- a/pkg/proxy/conntrack/cleanup.go +++ b/pkg/proxy/conntrack/cleanup.go @@ -32,27 +32,25 @@ import ( ) // CleanStaleEntries takes care of flushing stale conntrack entries for services and endpoints. -func CleanStaleEntries(ct Interface, svcPortMap proxy.ServicePortMap, +func CleanStaleEntries(ct Interface, ipFamily v1.IPFamily, svcPortMap proxy.ServicePortMap, serviceUpdateResult proxy.UpdateServiceMapResult, endpointsUpdateResult proxy.UpdateEndpointsMapResult) { - deleteStaleServiceConntrackEntries(ct, svcPortMap, serviceUpdateResult, endpointsUpdateResult) - deleteStaleEndpointConntrackEntries(ct, svcPortMap, endpointsUpdateResult) + deleteStaleServiceConntrackEntries(ct, ipFamily, svcPortMap, serviceUpdateResult, endpointsUpdateResult) + deleteStaleEndpointConntrackEntries(ct, ipFamily, svcPortMap, endpointsUpdateResult) } // deleteStaleServiceConntrackEntries takes care of flushing stale conntrack entries related // to UDP Service IPs. When a service has no endpoints and we drop traffic to it, conntrack // may create "black hole" entries for that IP+port. When the service gets endpoints we // need to delete those entries so further traffic doesn't get dropped. -func deleteStaleServiceConntrackEntries(ct Interface, svcPortMap proxy.ServicePortMap, serviceUpdateResult proxy.UpdateServiceMapResult, endpointsUpdateResult proxy.UpdateEndpointsMapResult) { +func deleteStaleServiceConntrackEntries(ct Interface, ipFamily v1.IPFamily, svcPortMap proxy.ServicePortMap, serviceUpdateResult proxy.UpdateServiceMapResult, endpointsUpdateResult proxy.UpdateEndpointsMapResult) { var filters []netlink.CustomConntrackFilter conntrackCleanupServiceIPs := serviceUpdateResult.DeletedUDPClusterIPs conntrackCleanupServiceNodePorts := sets.New[int]() - isIPv6 := false // merge newly active services gathered from endpointsUpdateResult // a UDP service that changes from 0 to non-0 endpoints is newly active. for _, svcPortName := range endpointsUpdateResult.NewlyActiveUDPServices { if svcInfo, ok := svcPortMap[svcPortName]; ok { - isIPv6 = netutils.IsIPv6(svcInfo.ClusterIP()) klog.V(4).InfoS("Newly-active UDP service may have stale conntrack entries", "servicePortName", svcPortName) conntrackCleanupServiceIPs.Insert(svcInfo.ClusterIP().String()) for _, extIP := range svcInfo.ExternalIPs() { @@ -77,7 +75,7 @@ func deleteStaleServiceConntrackEntries(ct Interface, svcPortMap proxy.ServicePo filters = append(filters, filterForPort(nodePort, v1.ProtocolUDP)) } - if err := ct.ClearEntries(getUnixIPFamily(isIPv6), filters...); err != nil { + if err := ct.ClearEntries(ipFamilyMap[ipFamily], filters...); err != nil { klog.ErrorS(err, "Failed to delete stale service connections") } } @@ -85,12 +83,10 @@ func deleteStaleServiceConntrackEntries(ct Interface, svcPortMap proxy.ServicePo // deleteStaleEndpointConntrackEntries takes care of flushing stale conntrack entries related // to UDP endpoints. After a UDP endpoint is removed we must flush any conntrack entries // for it so that if the same client keeps sending, the packets will get routed to a new endpoint. -func deleteStaleEndpointConntrackEntries(ct Interface, svcPortMap proxy.ServicePortMap, endpointsUpdateResult proxy.UpdateEndpointsMapResult) { +func deleteStaleEndpointConntrackEntries(ct Interface, ipFamily v1.IPFamily, svcPortMap proxy.ServicePortMap, endpointsUpdateResult proxy.UpdateEndpointsMapResult) { var filters []netlink.CustomConntrackFilter - isIPv6 := false for _, epSvcPair := range endpointsUpdateResult.DeletedUDPEndpoints { if svcInfo, ok := svcPortMap[epSvcPair.ServicePortName]; ok { - isIPv6 = netutils.IsIPv6(svcInfo.ClusterIP()) endpointIP := proxyutil.IPPart(epSvcPair.Endpoint) nodePort := svcInfo.NodePort() if nodePort != 0 { @@ -107,17 +103,15 @@ func deleteStaleEndpointConntrackEntries(ct Interface, svcPortMap proxy.ServiceP } } - if err := ct.ClearEntries(getUnixIPFamily(isIPv6), filters...); err != nil { + if err := ct.ClearEntries(ipFamilyMap[ipFamily], filters...); err != nil { klog.ErrorS(err, "Failed to delete stale endpoint connections") } } -// getUnixIPFamily returns the unix IPFamily constant. -func getUnixIPFamily(isIPv6 bool) uint8 { - if isIPv6 { - return unix.AF_INET6 - } - return unix.AF_INET +// ipFamilyMap maps v1.IPFamily to the corresponding unix constant. +var ipFamilyMap = map[v1.IPFamily]uint8{ + v1.IPv4Protocol: unix.AF_INET, + v1.IPv6Protocol: unix.AF_INET6, } // protocolMap maps v1.Protocol to the Assigned Internet Protocol Number. diff --git a/pkg/proxy/conntrack/cleanup_test.go b/pkg/proxy/conntrack/cleanup_test.go index e17961050dd..134f4d724d1 100644 --- a/pkg/proxy/conntrack/cleanup_test.go +++ b/pkg/proxy/conntrack/cleanup_test.go @@ -36,6 +36,7 @@ import ( ) const ( + testIPFamily = v1.IPv4Protocol testClusterIP = "172.30.1.1" testExternalIP = "192.168.99.100" testLoadBalancerIP = "1.2.3.4" @@ -249,7 +250,7 @@ func TestCleanStaleEntries(t *testing.T) { for _, tc := range testCases { t.Run(tc.description, func(t *testing.T) { fake := NewFake() - CleanStaleEntries(fake, svcPortMap, tc.serviceUpdates, tc.endpointsUpdates) + CleanStaleEntries(fake, testIPFamily, svcPortMap, tc.serviceUpdates, tc.endpointsUpdates) if !fake.ClearedIPs.Equal(tc.result.ClearedIPs) { t.Errorf("Expected ClearedIPs=%v, got %v", tc.result.ClearedIPs, fake.ClearedIPs) } diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index a87c70ec4ae..9ff20521f5a 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -1595,7 +1595,7 @@ func (proxier *Proxier) syncProxyRules() { } // Finish housekeeping, clear stale conntrack entries for UDP Services - conntrack.CleanStaleEntries(proxier.conntrack, proxier.svcPortMap, serviceUpdateResult, endpointUpdateResult) + conntrack.CleanStaleEntries(proxier.conntrack, proxier.ipFamily, proxier.svcPortMap, serviceUpdateResult, endpointUpdateResult) } func (proxier *Proxier) writeServiceToEndpointRules(natRules proxyutil.LineBuffer, svcPortNameString string, svcInfo proxy.ServicePort, svcChain utiliptables.Chain, endpoints []proxy.Endpoint, args []string) { diff --git a/pkg/proxy/ipvs/proxier.go b/pkg/proxy/ipvs/proxier.go index 8c9f8ff02da..2378c075ee0 100644 --- a/pkg/proxy/ipvs/proxier.go +++ b/pkg/proxy/ipvs/proxier.go @@ -1498,7 +1498,7 @@ func (proxier *Proxier) syncProxyRules() { metrics.SyncProxyRulesNoLocalEndpointsTotal.WithLabelValues("external").Set(float64(proxier.serviceNoLocalEndpointsExternal.Len())) // Finish housekeeping, clear stale conntrack entries for UDP Services - conntrack.CleanStaleEntries(proxier.conntrack, proxier.svcPortMap, serviceUpdateResult, endpointUpdateResult) + conntrack.CleanStaleEntries(proxier.conntrack, proxier.ipFamily, proxier.svcPortMap, serviceUpdateResult, endpointUpdateResult) } // writeIptablesRules write all iptables rules to proxier.natRules or proxier.FilterRules that ipvs proxier needed diff --git a/pkg/proxy/nftables/proxier.go b/pkg/proxy/nftables/proxier.go index 7da83173ecc..7c419b9c54e 100644 --- a/pkg/proxy/nftables/proxier.go +++ b/pkg/proxy/nftables/proxier.go @@ -1839,7 +1839,7 @@ func (proxier *Proxier) syncProxyRules() { } // Finish housekeeping, clear stale conntrack entries for UDP Services - conntrack.CleanStaleEntries(proxier.conntrack, proxier.svcPortMap, serviceUpdateResult, endpointUpdateResult) + conntrack.CleanStaleEntries(proxier.conntrack, proxier.ipFamily, proxier.svcPortMap, serviceUpdateResult, endpointUpdateResult) } func (proxier *Proxier) writeServiceToEndpointRules(tx *knftables.Transaction, svcInfo *servicePortInfo, svcChain string, endpoints []proxy.Endpoint) {