mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-04 09:49:50 +00:00
proxy/conntrack: use proxier ip family for conntrack cleanup
Signed-off-by: Daman Arora <aroradaman@gmail.com>
This commit is contained in:
parent
a6b4aa7005
commit
c34b20fa63
@ -32,27 +32,25 @@ import (
|
||||
)
|
||||
|
||||
// CleanStaleEntries takes care of flushing stale conntrack entries for services and endpoints.
|
||||
func CleanStaleEntries(ct Interface, svcPortMap proxy.ServicePortMap,
|
||||
func CleanStaleEntries(ct Interface, ipFamily v1.IPFamily, svcPortMap proxy.ServicePortMap,
|
||||
serviceUpdateResult proxy.UpdateServiceMapResult, endpointsUpdateResult proxy.UpdateEndpointsMapResult) {
|
||||
deleteStaleServiceConntrackEntries(ct, svcPortMap, serviceUpdateResult, endpointsUpdateResult)
|
||||
deleteStaleEndpointConntrackEntries(ct, svcPortMap, endpointsUpdateResult)
|
||||
deleteStaleServiceConntrackEntries(ct, ipFamily, svcPortMap, serviceUpdateResult, endpointsUpdateResult)
|
||||
deleteStaleEndpointConntrackEntries(ct, ipFamily, svcPortMap, endpointsUpdateResult)
|
||||
}
|
||||
|
||||
// deleteStaleServiceConntrackEntries takes care of flushing stale conntrack entries related
|
||||
// to UDP Service IPs. When a service has no endpoints and we drop traffic to it, conntrack
|
||||
// may create "black hole" entries for that IP+port. When the service gets endpoints we
|
||||
// need to delete those entries so further traffic doesn't get dropped.
|
||||
func deleteStaleServiceConntrackEntries(ct Interface, svcPortMap proxy.ServicePortMap, serviceUpdateResult proxy.UpdateServiceMapResult, endpointsUpdateResult proxy.UpdateEndpointsMapResult) {
|
||||
func deleteStaleServiceConntrackEntries(ct Interface, ipFamily v1.IPFamily, svcPortMap proxy.ServicePortMap, serviceUpdateResult proxy.UpdateServiceMapResult, endpointsUpdateResult proxy.UpdateEndpointsMapResult) {
|
||||
var filters []netlink.CustomConntrackFilter
|
||||
conntrackCleanupServiceIPs := serviceUpdateResult.DeletedUDPClusterIPs
|
||||
conntrackCleanupServiceNodePorts := sets.New[int]()
|
||||
isIPv6 := false
|
||||
|
||||
// merge newly active services gathered from endpointsUpdateResult
|
||||
// a UDP service that changes from 0 to non-0 endpoints is newly active.
|
||||
for _, svcPortName := range endpointsUpdateResult.NewlyActiveUDPServices {
|
||||
if svcInfo, ok := svcPortMap[svcPortName]; ok {
|
||||
isIPv6 = netutils.IsIPv6(svcInfo.ClusterIP())
|
||||
klog.V(4).InfoS("Newly-active UDP service may have stale conntrack entries", "servicePortName", svcPortName)
|
||||
conntrackCleanupServiceIPs.Insert(svcInfo.ClusterIP().String())
|
||||
for _, extIP := range svcInfo.ExternalIPs() {
|
||||
@ -77,7 +75,7 @@ func deleteStaleServiceConntrackEntries(ct Interface, svcPortMap proxy.ServicePo
|
||||
filters = append(filters, filterForPort(nodePort, v1.ProtocolUDP))
|
||||
}
|
||||
|
||||
if err := ct.ClearEntries(getUnixIPFamily(isIPv6), filters...); err != nil {
|
||||
if err := ct.ClearEntries(ipFamilyMap[ipFamily], filters...); err != nil {
|
||||
klog.ErrorS(err, "Failed to delete stale service connections")
|
||||
}
|
||||
}
|
||||
@ -85,12 +83,10 @@ func deleteStaleServiceConntrackEntries(ct Interface, svcPortMap proxy.ServicePo
|
||||
// deleteStaleEndpointConntrackEntries takes care of flushing stale conntrack entries related
|
||||
// to UDP endpoints. After a UDP endpoint is removed we must flush any conntrack entries
|
||||
// for it so that if the same client keeps sending, the packets will get routed to a new endpoint.
|
||||
func deleteStaleEndpointConntrackEntries(ct Interface, svcPortMap proxy.ServicePortMap, endpointsUpdateResult proxy.UpdateEndpointsMapResult) {
|
||||
func deleteStaleEndpointConntrackEntries(ct Interface, ipFamily v1.IPFamily, svcPortMap proxy.ServicePortMap, endpointsUpdateResult proxy.UpdateEndpointsMapResult) {
|
||||
var filters []netlink.CustomConntrackFilter
|
||||
isIPv6 := false
|
||||
for _, epSvcPair := range endpointsUpdateResult.DeletedUDPEndpoints {
|
||||
if svcInfo, ok := svcPortMap[epSvcPair.ServicePortName]; ok {
|
||||
isIPv6 = netutils.IsIPv6(svcInfo.ClusterIP())
|
||||
endpointIP := proxyutil.IPPart(epSvcPair.Endpoint)
|
||||
nodePort := svcInfo.NodePort()
|
||||
if nodePort != 0 {
|
||||
@ -107,17 +103,15 @@ func deleteStaleEndpointConntrackEntries(ct Interface, svcPortMap proxy.ServiceP
|
||||
}
|
||||
}
|
||||
|
||||
if err := ct.ClearEntries(getUnixIPFamily(isIPv6), filters...); err != nil {
|
||||
if err := ct.ClearEntries(ipFamilyMap[ipFamily], filters...); err != nil {
|
||||
klog.ErrorS(err, "Failed to delete stale endpoint connections")
|
||||
}
|
||||
}
|
||||
|
||||
// getUnixIPFamily returns the unix IPFamily constant.
|
||||
func getUnixIPFamily(isIPv6 bool) uint8 {
|
||||
if isIPv6 {
|
||||
return unix.AF_INET6
|
||||
}
|
||||
return unix.AF_INET
|
||||
// ipFamilyMap maps v1.IPFamily to the corresponding unix constant.
|
||||
var ipFamilyMap = map[v1.IPFamily]uint8{
|
||||
v1.IPv4Protocol: unix.AF_INET,
|
||||
v1.IPv6Protocol: unix.AF_INET6,
|
||||
}
|
||||
|
||||
// protocolMap maps v1.Protocol to the Assigned Internet Protocol Number.
|
||||
|
@ -36,6 +36,7 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
testIPFamily = v1.IPv4Protocol
|
||||
testClusterIP = "172.30.1.1"
|
||||
testExternalIP = "192.168.99.100"
|
||||
testLoadBalancerIP = "1.2.3.4"
|
||||
@ -249,7 +250,7 @@ func TestCleanStaleEntries(t *testing.T) {
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.description, func(t *testing.T) {
|
||||
fake := NewFake()
|
||||
CleanStaleEntries(fake, svcPortMap, tc.serviceUpdates, tc.endpointsUpdates)
|
||||
CleanStaleEntries(fake, testIPFamily, svcPortMap, tc.serviceUpdates, tc.endpointsUpdates)
|
||||
if !fake.ClearedIPs.Equal(tc.result.ClearedIPs) {
|
||||
t.Errorf("Expected ClearedIPs=%v, got %v", tc.result.ClearedIPs, fake.ClearedIPs)
|
||||
}
|
||||
|
@ -1595,7 +1595,7 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
}
|
||||
|
||||
// Finish housekeeping, clear stale conntrack entries for UDP Services
|
||||
conntrack.CleanStaleEntries(proxier.conntrack, proxier.svcPortMap, serviceUpdateResult, endpointUpdateResult)
|
||||
conntrack.CleanStaleEntries(proxier.conntrack, proxier.ipFamily, proxier.svcPortMap, serviceUpdateResult, endpointUpdateResult)
|
||||
}
|
||||
|
||||
func (proxier *Proxier) writeServiceToEndpointRules(natRules proxyutil.LineBuffer, svcPortNameString string, svcInfo proxy.ServicePort, svcChain utiliptables.Chain, endpoints []proxy.Endpoint, args []string) {
|
||||
|
@ -1498,7 +1498,7 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
metrics.SyncProxyRulesNoLocalEndpointsTotal.WithLabelValues("external").Set(float64(proxier.serviceNoLocalEndpointsExternal.Len()))
|
||||
|
||||
// Finish housekeeping, clear stale conntrack entries for UDP Services
|
||||
conntrack.CleanStaleEntries(proxier.conntrack, proxier.svcPortMap, serviceUpdateResult, endpointUpdateResult)
|
||||
conntrack.CleanStaleEntries(proxier.conntrack, proxier.ipFamily, proxier.svcPortMap, serviceUpdateResult, endpointUpdateResult)
|
||||
}
|
||||
|
||||
// writeIptablesRules write all iptables rules to proxier.natRules or proxier.FilterRules that ipvs proxier needed
|
||||
|
@ -1839,7 +1839,7 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
}
|
||||
|
||||
// Finish housekeeping, clear stale conntrack entries for UDP Services
|
||||
conntrack.CleanStaleEntries(proxier.conntrack, proxier.svcPortMap, serviceUpdateResult, endpointUpdateResult)
|
||||
conntrack.CleanStaleEntries(proxier.conntrack, proxier.ipFamily, proxier.svcPortMap, serviceUpdateResult, endpointUpdateResult)
|
||||
}
|
||||
|
||||
func (proxier *Proxier) writeServiceToEndpointRules(tx *knftables.Transaction, svcInfo *servicePortInfo, svcChain string, endpoints []proxy.Endpoint) {
|
||||
|
Loading…
Reference in New Issue
Block a user