conntrack reconciler must check the dst port

The conntrack reconciler maintains the consistency between the
conntrack table on each node and the desired state of Kubernetes UDP services.

A valid entry matches a service's ClusterIP, LoadBalancerIP, or ExternalIP and Service port,
or any ip matching a NodePort, and has a reverse source IP matching an active endpoint for
that service. Other entries are deleted.

Services without endpoints and traffic not handled by kube-proxy are ignored

Co-authored-by: Daman Arora <aroradaman@gmail.com>
This commit is contained in:
Antonio Ojea 2025-02-27 22:14:15 +00:00
parent 0821aa2308
commit 2a60692f5b
3 changed files with 346 additions and 78 deletions

View File

@ -20,6 +20,8 @@ limitations under the License.
package conntrack
import (
"net"
"strconv"
"time"
"github.com/vishvananda/netlink"
@ -32,8 +34,14 @@ import (
netutils "k8s.io/utils/net"
)
// Kubernetes UDP services can be affected by stale conntrack entries.
// These entries may point to endpoints that no longer exist,
// leading to packet loss and connectivity problems.
// CleanStaleEntries scans conntrack table and removes any entries
// for a service that do not correspond to a serving endpoint.
// List existing conntrack entries and calculate the desired conntrack state
// based on the current Services and Endpoints.
func CleanStaleEntries(ct Interface, ipFamily v1.IPFamily,
svcPortMap proxy.ServicePortMap, endpointsMap proxy.EndpointsMap) {
@ -46,7 +54,7 @@ func CleanStaleEntries(ct Interface, ipFamily v1.IPFamily,
return
}
// serviceIPEndpointIPs maps service IPs (ClusterIP, LoadBalancerIPs and ExternalIPs)
// serviceIPEndpointIPs maps service IPs (ClusterIP, LoadBalancerIPs and ExternalIPs) and Service Port
// to the set of serving endpoint IPs.
serviceIPEndpointIPs := make(map[string]sets.Set[string])
// serviceNodePortEndpointIPs maps service NodePort to the set of serving endpoint IPs.
@ -73,14 +81,28 @@ func CleanStaleEntries(ct Interface, ipFamily v1.IPFamily,
}
}
serviceIPEndpointIPs[svc.ClusterIP().String()] = endpointIPs
// a Service without endpoints does not require to clean the conntrack entries associated.
if endpointIPs.Len() == 0 {
continue
}
// we need to filter entries that are directed to a Service IP:Port frontend
// that does not have a backend as part of the endpoints IPs
portStr := strconv.Itoa(svc.Port())
// clusterIP:Port
serviceIPEndpointIPs[net.JoinHostPort(svc.ClusterIP().String(), portStr)] = endpointIPs
// loadbalancerIP:Port
for _, loadBalancerIP := range svc.LoadBalancerVIPs() {
serviceIPEndpointIPs[loadBalancerIP.String()] = endpointIPs
serviceIPEndpointIPs[net.JoinHostPort(loadBalancerIP.String(), portStr)] = endpointIPs
}
// externalIP:Port
for _, externalIP := range svc.ExternalIPs() {
serviceIPEndpointIPs[externalIP.String()] = endpointIPs
serviceIPEndpointIPs[net.JoinHostPort(externalIP.String(), portStr)] = endpointIPs
}
// we need to filter entries that are directed to a *:NodePort
// that does not have a backend as part of the endpoints IPs
if svc.NodePort() != 0 {
// *:NodePort
serviceNodePortEndpointIPs[svc.NodePort()] = endpointIPs
}
}
@ -92,26 +114,25 @@ func CleanStaleEntries(ct Interface, ipFamily v1.IPFamily,
continue
}
origDst := entry.Forward.DstIP.String()
origPortDst := int(entry.Forward.DstPort)
replySrc := entry.Reverse.SrcIP.String()
origDst := entry.Forward.DstIP.String() // match Service IP
origPortDst := int(entry.Forward.DstPort) // match Service Port
origPortDstStr := strconv.Itoa(origPortDst)
replySrc := entry.Reverse.SrcIP.String() // match Serving Endpoint IP
// if the original destination (--orig-dst) of the entry is service IP (ClusterIP,
// LoadBalancerIPs or ExternalIPs) and the reply source (--reply-src) is not IP of
// any serving endpoint, we clear the entry.
if _, ok := serviceIPEndpointIPs[origDst]; ok {
if !serviceIPEndpointIPs[origDst].Has(replySrc) {
filters = append(filters, filterForNAT(origDst, replySrc, v1.ProtocolUDP))
}
// LoadBalancerIPs or ExternalIPs) and (--orig-port-dst) of the flow is service Port
// and the reply source (--reply-src) is not IP of any serving endpoint, we clear the entry.
endpoints, ok := serviceIPEndpointIPs[net.JoinHostPort(origDst, origPortDstStr)]
if ok && !endpoints.Has(replySrc) {
filters = append(filters, filterForIPPortNAT(origDst, replySrc, entry.Forward.DstPort, v1.ProtocolUDP))
}
// if the original port destination (--orig-port-dst) of the flow is service
// NodePort and the reply source (--reply-src) is not IP of any serving endpoint,
// we clear the entry.
if _, ok := serviceNodePortEndpointIPs[origPortDst]; ok {
if !serviceNodePortEndpointIPs[origPortDst].Has(replySrc) {
filters = append(filters, filterForPortNAT(replySrc, origPortDst, v1.ProtocolUDP))
}
endpoints, ok = serviceNodePortEndpointIPs[origPortDst]
if ok && !endpoints.Has(replySrc) {
filters = append(filters, filterForPortNAT(replySrc, origPortDst, v1.ProtocolUDP))
}
}
@ -136,14 +157,16 @@ var protocolMap = map[v1.Protocol]uint8{
v1.ProtocolSCTP: unix.IPPROTO_SCTP,
}
// filterForNAT returns *conntrackFilter to delete the conntrack entries for connections
// specified by the destination IP (original direction) and source IP (reply direction).
func filterForNAT(origin, dest string, protocol v1.Protocol) *conntrackFilter {
klog.V(4).InfoS("Adding conntrack filter for cleanup", "org-dst", origin, "reply-src", dest, "protocol", protocol)
// filterForIPPortNAT returns *conntrackFilter to delete the conntrack entries for connections
// specified by the destination IP (original direction) and destination port (original direction)
// and source IP (reply direction).
func filterForIPPortNAT(origin, dest string, dstPort uint16, protocol v1.Protocol) *conntrackFilter {
klog.V(6).InfoS("Adding conntrack filter for cleanup", "org-dst", origin, "reply-src", dest, "protocol", protocol)
return &conntrackFilter{
protocol: protocolMap[protocol],
original: &connectionTuple{
dstIP: netutils.ParseIPSloppy(origin),
dstIP: netutils.ParseIPSloppy(origin),
dstPort: dstPort,
},
reply: &connectionTuple{
srcIP: netutils.ParseIPSloppy(dest),

View File

@ -21,9 +21,11 @@ package conntrack
import (
"fmt"
"math/rand"
"sort"
"testing"
"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/require"
"github.com/vishvananda/netlink"
"golang.org/x/sys/unix"
@ -50,8 +52,11 @@ const (
testNonServingEndpointIP = "10.240.1.5"
testDeletedEndpointIP = "10.240.2.6"
testPort = 8000
testNodePort = 32000
testServicePort = 8000
testServiceNodePort = 32000
// testNonServicePort is used to mock conntrack flow entries which are not owned by
// kube-proxy and reconciler should not consider these for cleanup
testNonServicePort = 3000
)
func TestCleanStaleEntries(t *testing.T) {
@ -74,19 +79,19 @@ func TestCleanStaleEntries(t *testing.T) {
Ports: []v1.ServicePort{
{
Name: "test-tcp",
Port: testPort,
Port: testServicePort,
Protocol: v1.ProtocolTCP,
},
{
Name: "test-udp",
Port: testPort,
NodePort: testNodePort,
Port: testServicePort,
NodePort: testServiceNodePort,
Protocol: v1.ProtocolUDP,
},
{
Name: "test-sctp",
Port: testPort,
NodePort: testNodePort,
Port: testServicePort,
NodePort: testServiceNodePort,
Protocol: v1.ProtocolSCTP,
},
},
@ -126,17 +131,17 @@ func TestCleanStaleEntries(t *testing.T) {
Ports: []discovery.EndpointPort{
{
Name: ptr.To("test-tcp"),
Port: ptr.To(int32(testPort)),
Port: ptr.To(int32(testServicePort)),
Protocol: ptr.To(v1.ProtocolTCP),
},
{
Name: ptr.To("test-udp"),
Port: ptr.To(int32(testPort)),
Port: ptr.To(int32(testServicePort)),
Protocol: ptr.To(v1.ProtocolUDP),
},
{
Name: ptr.To("test-sctp"),
Port: ptr.To(int32(testPort)),
Port: ptr.To(int32(testServicePort)),
Protocol: ptr.To(v1.ProtocolSCTP),
},
},
@ -212,35 +217,47 @@ func TestCleanStaleEntries(t *testing.T) {
}
}
// mock existing entries before cleanup
// we create 36 fake flow entries ( 3 Endpoints * 3 Protocols * ( 3 (ServiceIPs) + 1 (NodePort))
var mockEntries []*netlink.ConntrackFlow
// expectedEntries are the entries on which we will assert the cleanup logic
var expectedEntries []*netlink.ConntrackFlow
// The following mock conntrack flow entries `entriesBeforeCleanup` and `entriesAfterCleanup`
// represent conntrack flow entries before and after reconciler cleanup loop. Before cleanup,
// reconciler lists the conntrack flows, receiving `entriesBeforeCleanup` and after cleanup,
// we list the conntrack flows and assert them to match with `entriesAfterCleanup`.
// {entriesBeforeCleanup} - {entriesAfterCleanup} = entries cleared by conntrack reconciler
var entriesBeforeCleanup []*netlink.ConntrackFlow
// entriesBeforeCleanup - entriesAfterCleanup = entries cleared by conntrack reconciler
var entriesAfterCleanup []*netlink.ConntrackFlow
// we create 63 fake flow entries ( 3 Endpoints * 3 Protocols * ( 3 (ServiceIP:ServicePort) + 3 (ServiceIP:NonServicePort) + 1 (NodePort))
for _, dnatDest := range []string{testServingEndpointIP, testNonServingEndpointIP, testDeletedEndpointIP} {
for _, proto := range []uint8{unix.IPPROTO_TCP, unix.IPPROTO_UDP, unix.IPPROTO_SCTP} {
for _, origDest := range []string{testClusterIP, testLoadBalancerIP, testExternalIP} {
entry := &netlink.ConntrackFlow{
FamilyType: unix.AF_INET,
Forward: netlink.IPTuple{
DstIP: netutils.ParseIPSloppy(origDest),
Protocol: proto,
},
Reverse: netlink.IPTuple{
Protocol: proto,
SrcIP: netutils.ParseIPSloppy(dnatDest),
},
}
mockEntries = append(mockEntries, entry)
// we do not expect deleted or non-serving UDP endpoints flows to be present after cleanup
if !(proto == unix.IPPROTO_UDP && (dnatDest == testNonServingEndpointIP || dnatDest == testDeletedEndpointIP)) {
expectedEntries = append(expectedEntries, entry)
for _, port := range []uint16{testServicePort, testNonServicePort} {
entry := &netlink.ConntrackFlow{
FamilyType: unix.AF_INET,
Forward: netlink.IPTuple{
DstIP: netutils.ParseIPSloppy(origDest),
DstPort: port,
Protocol: proto,
},
Reverse: netlink.IPTuple{
Protocol: proto,
SrcIP: netutils.ParseIPSloppy(dnatDest),
},
}
entriesBeforeCleanup = append(entriesBeforeCleanup, entry)
if proto == unix.IPPROTO_UDP && port == testServicePort && dnatDest != testServingEndpointIP {
// we do not expect UDP entries with destination port `testServicePort` and DNATed destination
// address not an address of serving endpoint to be present after cleanup.
} else {
entriesAfterCleanup = append(entriesAfterCleanup, entry)
}
}
}
entry := &netlink.ConntrackFlow{
FamilyType: unix.AF_INET,
Forward: netlink.IPTuple{
DstPort: testNodePort,
DstPort: testServiceNodePort,
Protocol: proto,
},
Reverse: netlink.IPTuple{
@ -248,50 +265,264 @@ func TestCleanStaleEntries(t *testing.T) {
SrcIP: netutils.ParseIPSloppy(dnatDest),
},
}
mockEntries = append(mockEntries, entry)
// we do not expect deleted or non-serving UDP endpoints entries to be present after cleanup
if !(proto == unix.IPPROTO_UDP && (dnatDest == testNonServingEndpointIP || dnatDest == testDeletedEndpointIP)) {
expectedEntries = append(expectedEntries, entry)
entriesBeforeCleanup = append(entriesBeforeCleanup, entry)
if proto == unix.IPPROTO_UDP && dnatDest != testServingEndpointIP {
// we do not expect UDP entries with DNATed destination address not
// an address of serving endpoint to be present after cleanup.
} else {
entriesAfterCleanup = append(entriesAfterCleanup, entry)
}
}
}
// add some non-DNATed mock entries which should be cleared up by reconciler
// add 6 non-DNATed mock entries which should be cleared up by reconciler
// These will exist if the proxy don't have DROP/REJECT rule for service with
// no endpoints, --orig-dst and --reply-src will be same for these entries.
for _, ip := range []string{testClusterIP, testLoadBalancerIP, testExternalIP} {
entry := &netlink.ConntrackFlow{
FamilyType: unix.AF_INET,
Forward: netlink.IPTuple{
DstIP: netutils.ParseIPSloppy(ip),
Protocol: unix.IPPROTO_UDP,
},
Reverse: netlink.IPTuple{
Protocol: unix.IPPROTO_UDP,
SrcIP: netutils.ParseIPSloppy(ip),
},
for _, port := range []uint16{testServicePort, testNonServicePort} {
entry := &netlink.ConntrackFlow{
FamilyType: unix.AF_INET,
Forward: netlink.IPTuple{
DstIP: netutils.ParseIPSloppy(ip),
DstPort: port,
Protocol: unix.IPPROTO_UDP,
},
Reverse: netlink.IPTuple{
Protocol: unix.IPPROTO_UDP,
SrcIP: netutils.ParseIPSloppy(ip),
},
}
entriesBeforeCleanup = append(entriesBeforeCleanup, entry)
// we do not expect entries with destination port `testServicePort` to be
// present after cleanup.
if port != testServicePort {
entriesAfterCleanup = append(entriesAfterCleanup, entry)
}
}
mockEntries = append(mockEntries, entry)
}
t.Logf("entries before cleanup %d after cleanup %d", len(entriesBeforeCleanup), len(entriesAfterCleanup))
fake := NewFake()
fake.entries = mockEntries
fake.entries = entriesBeforeCleanup
CleanStaleEntries(fake, testIPFamily, svcPortMap, endpointsMap)
actualEntries, _ := fake.ListEntries(ipFamilyMap[testIPFamily])
require.Equal(t, len(expectedEntries), len(actualEntries))
require.Equal(t, len(entriesAfterCleanup), len(actualEntries))
// sort the actual flows before comparison
sort.Slice(actualEntries, func(i, j int) bool {
return actualEntries[i].String() < actualEntries[j].String()
})
// sort the expected flows before comparison
sort.Slice(expectedEntries, func(i, j int) bool {
return expectedEntries[i].String() < expectedEntries[j].String()
sort.Slice(entriesAfterCleanup, func(i, j int) bool {
return entriesAfterCleanup[i].String() < entriesAfterCleanup[j].String()
})
for i := 0; i < len(expectedEntries); i++ {
require.Equal(t, expectedEntries[i], actualEntries[i])
if diff := cmp.Diff(entriesAfterCleanup, actualEntries); len(diff) > 0 {
t.Errorf("unexpected entries after cleanup: %s", diff)
}
}
func TestPerformanceCleanStaleEntries(t *testing.T) {
sct := proxy.NewServiceChangeTracker(nil, v1.IPv4Protocol, nil, nil)
svc := &v1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: testServiceName,
Namespace: testServiceNamespace,
},
Spec: v1.ServiceSpec{
ClusterIP: testClusterIP,
ExternalIPs: []string{testExternalIP},
Ports: []v1.ServicePort{
{
Name: "test-udp",
Port: testServicePort,
Protocol: v1.ProtocolUDP,
},
},
},
Status: v1.ServiceStatus{
LoadBalancer: v1.LoadBalancerStatus{
Ingress: []v1.LoadBalancerIngress{{
IP: testLoadBalancerIP,
}},
},
},
}
sct.Update(nil, svc)
svcPortMap := make(proxy.ServicePortMap)
_ = svcPortMap.Update(sct)
ect := proxy.NewEndpointsChangeTracker("test-worker", nil, v1.IPv4Protocol, nil, nil)
eps := &discovery.EndpointSlice{
TypeMeta: metav1.TypeMeta{},
AddressType: discovery.AddressTypeIPv4,
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s-0", testServiceName),
Namespace: testServiceNamespace,
Labels: map[string]string{discovery.LabelServiceName: testServiceName},
},
Endpoints: []discovery.Endpoint{
{
Addresses: []string{testServingEndpointIP},
Conditions: discovery.EndpointConditions{Serving: ptr.To(true)},
},
{
Addresses: []string{testNonServingEndpointIP},
Conditions: discovery.EndpointConditions{Serving: ptr.To(false)},
},
},
Ports: []discovery.EndpointPort{
{
Name: ptr.To("test-udp"),
Port: ptr.To(int32(testServicePort)),
Protocol: ptr.To(v1.ProtocolUDP),
},
},
}
ect.EndpointSliceUpdate(eps, false)
endpointsMap := make(proxy.EndpointsMap)
_ = endpointsMap.Update(ect)
genConntrackEntry := func(dstPort uint16, revSrc string) *netlink.ConntrackFlow {
return &netlink.ConntrackFlow{
FamilyType: unix.AF_INET,
Forward: netlink.IPTuple{
DstIP: netutils.ParseIPSloppy(testExternalIP),
DstPort: dstPort,
Protocol: unix.IPPROTO_UDP,
},
Reverse: netlink.IPTuple{
Protocol: unix.IPPROTO_UDP,
SrcIP: netutils.ParseIPSloppy(revSrc),
},
}
}
fake := NewFake()
// 1 valid entry
fake.entries = append(fake.entries, genConntrackEntry(uint16(testServicePort), testServingEndpointIP))
expectedEntries := 1
// 1 stale entry
fake.entries = append(fake.entries, genConntrackEntry(uint16(testServicePort), testDeletedEndpointIP))
expectedDeleted := 1
// 1 M to the Service IP with random ports
for i := 0; i < 1000*1000; i++ {
port := uint16(rand.Intn(65535))
if port == testServicePort {
expectedDeleted++
} else {
expectedEntries++
}
fake.entries = append(fake.entries, genConntrackEntry(port, testDeletedEndpointIP))
}
CleanStaleEntries(fake, testIPFamily, svcPortMap, endpointsMap)
actualEntries, _ := fake.ListEntries(ipFamilyMap[testIPFamily])
if len(actualEntries) != expectedEntries {
t.Errorf("unexpected number of entries, got %d expected %d", len(actualEntries), expectedEntries)
}
// expected conntrack entries
// 1 for CleanStaleEntries + 1 for ListEntries + 1 for ConntrackDeleteFilters to dump the conntrack table
// n for the expected deleted stale entries
t.Logf("expected deleted %d", expectedDeleted)
if fake.netlinkRequests != 3+expectedDeleted {
t.Errorf("expected %d netlink requests, got %d", 3+expectedDeleted, fake.netlinkRequests)
}
}
func TestServiceWithoutEndpoints(t *testing.T) {
sct := proxy.NewServiceChangeTracker(nil, v1.IPv4Protocol, nil, nil)
svc := &v1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: testServiceName,
Namespace: testServiceNamespace,
},
Spec: v1.ServiceSpec{
ClusterIP: testClusterIP,
ExternalIPs: []string{testExternalIP},
Ports: []v1.ServicePort{
{
Name: "test-udp",
Port: testServicePort,
Protocol: v1.ProtocolUDP,
},
},
},
Status: v1.ServiceStatus{
LoadBalancer: v1.LoadBalancerStatus{
Ingress: []v1.LoadBalancerIngress{{
IP: testLoadBalancerIP,
}},
},
},
}
sct.Update(nil, svc)
svcPortMap := make(proxy.ServicePortMap)
_ = svcPortMap.Update(sct)
ect := proxy.NewEndpointsChangeTracker("test-worker", nil, v1.IPv4Protocol, nil, nil)
eps := &discovery.EndpointSlice{
TypeMeta: metav1.TypeMeta{},
AddressType: discovery.AddressTypeIPv4,
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s-0", testServiceName),
Namespace: testServiceNamespace,
Labels: map[string]string{discovery.LabelServiceName: "non-existing-service"},
},
Endpoints: []discovery.Endpoint{
{
Addresses: []string{testServingEndpointIP},
Conditions: discovery.EndpointConditions{Serving: ptr.To(true)},
},
{
Addresses: []string{testNonServingEndpointIP},
Conditions: discovery.EndpointConditions{Serving: ptr.To(false)},
},
},
Ports: []discovery.EndpointPort{
{
Name: ptr.To("test-udp"),
Port: ptr.To(int32(testServicePort)),
Protocol: ptr.To(v1.ProtocolUDP),
},
},
}
ect.EndpointSliceUpdate(eps, false)
endpointsMap := make(proxy.EndpointsMap)
_ = endpointsMap.Update(ect)
genConntrackEntry := func(dstPort uint16, revSrc string) *netlink.ConntrackFlow {
return &netlink.ConntrackFlow{
FamilyType: unix.AF_INET,
Forward: netlink.IPTuple{
DstIP: netutils.ParseIPSloppy(testExternalIP),
DstPort: dstPort,
Protocol: unix.IPPROTO_UDP,
},
Reverse: netlink.IPTuple{
Protocol: unix.IPPROTO_UDP,
SrcIP: netutils.ParseIPSloppy(revSrc),
},
}
}
fake := NewFake()
// 1 valid entry
fake.entries = append(fake.entries, genConntrackEntry(uint16(testServicePort), testServingEndpointIP))
// 1 stale entry
fake.entries = append(fake.entries, genConntrackEntry(uint16(testServicePort), testDeletedEndpointIP))
CleanStaleEntries(fake, testIPFamily, svcPortMap, endpointsMap)
actualEntries, _ := fake.ListEntries(ipFamilyMap[testIPFamily])
if len(actualEntries) != 2 {
t.Errorf("unexpected number of entries, got %d expected %d", len(actualEntries), 2)
}
}
@ -300,6 +531,7 @@ func TestFilterForNAT(t *testing.T) {
name string
orig string
dest string
dstPort uint16
protocol v1.Protocol
expectedFilter *conntrackFilter
}{
@ -329,7 +561,7 @@ func TestFilterForNAT(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
require.Equal(t, tc.expectedFilter, filterForNAT(tc.orig, tc.dest, tc.protocol))
require.Equal(t, tc.expectedFilter, filterForIPPortNAT(tc.orig, tc.dest, tc.dstPort, tc.protocol))
})
}
}

View File

@ -25,7 +25,8 @@ import (
// FakeInterface implements Interface by just recording entries that have been cleared.
type FakeInterface struct {
entries []*netlink.ConntrackFlow
entries []*netlink.ConntrackFlow
netlinkRequests int // try to get the estimated number of netlink request
}
var _ Interface = &FakeInterface{}
@ -37,18 +38,30 @@ func NewFake() *FakeInterface {
// ListEntries is part of Interface
func (fake *FakeInterface) ListEntries(_ uint8) ([]*netlink.ConntrackFlow, error) {
return fake.entries, nil
entries := make([]*netlink.ConntrackFlow, len(fake.entries))
copy(entries, fake.entries)
// 1 netlink request to dump the table
// https://github.com/vishvananda/netlink/blob/0af32151e72b990c271ef6268e8aadb7e015f2bd/conntrack_linux.go#L93-L94
fake.netlinkRequests++
return entries, nil
}
// ClearEntries is part of Interface
func (fake *FakeInterface) ClearEntries(_ uint8, filters ...netlink.CustomConntrackFilter) (int, error) {
var flows []*netlink.ConntrackFlow
before := len(fake.entries)
// 1 netlink request to dump the table
// https://github.com/vishvananda/netlink/blob/0af32151e72b990c271ef6268e8aadb7e015f2bd/conntrack_linux.go#L163
fake.netlinkRequests++
for _, flow := range fake.entries {
var matched bool
for _, filter := range filters {
matched = filter.MatchConntrackFlow(flow)
if matched {
// 1 netlink request to delete the flow
// https://github.com/vishvananda/netlink/blob/0af32151e72b990c271ef6268e8aadb7e015f2bd/conntrack_linux.go#L182
fake.netlinkRequests++
break
}
}