From 2a60692f5b6d97b100b7123e40767da56d7fa34a Mon Sep 17 00:00:00 2001 From: Antonio Ojea Date: Thu, 27 Feb 2025 22:14:15 +0000 Subject: [PATCH] conntrack reconciler must check the dst port The conntrack reconciler maintains the consistency between the conntrack table on each node and the desired state of Kubernetes UDP services. A valid entry matches a service's ClusterIP, LoadBalancerIP, or ExternalIP and Service port, or any ip matching a NodePort, and has a reverse source IP matching an active endpoint for that service. Other entries are deleted. Services without endpoints and traffic not handled by kube-proxy are ignored Co-authored-by: Daman Arora --- pkg/proxy/conntrack/cleanup.go | 67 ++++-- pkg/proxy/conntrack/cleanup_test.go | 340 +++++++++++++++++++++++----- pkg/proxy/conntrack/fake.go | 17 +- 3 files changed, 346 insertions(+), 78 deletions(-) diff --git a/pkg/proxy/conntrack/cleanup.go b/pkg/proxy/conntrack/cleanup.go index e23cada043a..b125f687418 100644 --- a/pkg/proxy/conntrack/cleanup.go +++ b/pkg/proxy/conntrack/cleanup.go @@ -20,6 +20,8 @@ limitations under the License. package conntrack import ( + "net" + "strconv" "time" "github.com/vishvananda/netlink" @@ -32,8 +34,14 @@ import ( netutils "k8s.io/utils/net" ) +// Kubernetes UDP services can be affected by stale conntrack entries. +// These entries may point to endpoints that no longer exist, +// leading to packet loss and connectivity problems. + // CleanStaleEntries scans conntrack table and removes any entries // for a service that do not correspond to a serving endpoint. +// List existing conntrack entries and calculate the desired conntrack state +// based on the current Services and Endpoints. func CleanStaleEntries(ct Interface, ipFamily v1.IPFamily, svcPortMap proxy.ServicePortMap, endpointsMap proxy.EndpointsMap) { @@ -46,7 +54,7 @@ func CleanStaleEntries(ct Interface, ipFamily v1.IPFamily, return } - // serviceIPEndpointIPs maps service IPs (ClusterIP, LoadBalancerIPs and ExternalIPs) + // serviceIPEndpointIPs maps service IPs (ClusterIP, LoadBalancerIPs and ExternalIPs) and Service Port // to the set of serving endpoint IPs. serviceIPEndpointIPs := make(map[string]sets.Set[string]) // serviceNodePortEndpointIPs maps service NodePort to the set of serving endpoint IPs. @@ -73,14 +81,28 @@ func CleanStaleEntries(ct Interface, ipFamily v1.IPFamily, } } - serviceIPEndpointIPs[svc.ClusterIP().String()] = endpointIPs + // a Service without endpoints does not require to clean the conntrack entries associated. + if endpointIPs.Len() == 0 { + continue + } + + // we need to filter entries that are directed to a Service IP:Port frontend + // that does not have a backend as part of the endpoints IPs + portStr := strconv.Itoa(svc.Port()) + // clusterIP:Port + serviceIPEndpointIPs[net.JoinHostPort(svc.ClusterIP().String(), portStr)] = endpointIPs + // loadbalancerIP:Port for _, loadBalancerIP := range svc.LoadBalancerVIPs() { - serviceIPEndpointIPs[loadBalancerIP.String()] = endpointIPs + serviceIPEndpointIPs[net.JoinHostPort(loadBalancerIP.String(), portStr)] = endpointIPs } + // externalIP:Port for _, externalIP := range svc.ExternalIPs() { - serviceIPEndpointIPs[externalIP.String()] = endpointIPs + serviceIPEndpointIPs[net.JoinHostPort(externalIP.String(), portStr)] = endpointIPs } + // we need to filter entries that are directed to a *:NodePort + // that does not have a backend as part of the endpoints IPs if svc.NodePort() != 0 { + // *:NodePort serviceNodePortEndpointIPs[svc.NodePort()] = endpointIPs } } @@ -92,26 +114,25 @@ func CleanStaleEntries(ct Interface, ipFamily v1.IPFamily, continue } - origDst := entry.Forward.DstIP.String() - origPortDst := int(entry.Forward.DstPort) - replySrc := entry.Reverse.SrcIP.String() + origDst := entry.Forward.DstIP.String() // match Service IP + origPortDst := int(entry.Forward.DstPort) // match Service Port + origPortDstStr := strconv.Itoa(origPortDst) + replySrc := entry.Reverse.SrcIP.String() // match Serving Endpoint IP // if the original destination (--orig-dst) of the entry is service IP (ClusterIP, - // LoadBalancerIPs or ExternalIPs) and the reply source (--reply-src) is not IP of - // any serving endpoint, we clear the entry. - if _, ok := serviceIPEndpointIPs[origDst]; ok { - if !serviceIPEndpointIPs[origDst].Has(replySrc) { - filters = append(filters, filterForNAT(origDst, replySrc, v1.ProtocolUDP)) - } + // LoadBalancerIPs or ExternalIPs) and (--orig-port-dst) of the flow is service Port + // and the reply source (--reply-src) is not IP of any serving endpoint, we clear the entry. + endpoints, ok := serviceIPEndpointIPs[net.JoinHostPort(origDst, origPortDstStr)] + if ok && !endpoints.Has(replySrc) { + filters = append(filters, filterForIPPortNAT(origDst, replySrc, entry.Forward.DstPort, v1.ProtocolUDP)) } // if the original port destination (--orig-port-dst) of the flow is service // NodePort and the reply source (--reply-src) is not IP of any serving endpoint, // we clear the entry. - if _, ok := serviceNodePortEndpointIPs[origPortDst]; ok { - if !serviceNodePortEndpointIPs[origPortDst].Has(replySrc) { - filters = append(filters, filterForPortNAT(replySrc, origPortDst, v1.ProtocolUDP)) - } + endpoints, ok = serviceNodePortEndpointIPs[origPortDst] + if ok && !endpoints.Has(replySrc) { + filters = append(filters, filterForPortNAT(replySrc, origPortDst, v1.ProtocolUDP)) } } @@ -136,14 +157,16 @@ var protocolMap = map[v1.Protocol]uint8{ v1.ProtocolSCTP: unix.IPPROTO_SCTP, } -// filterForNAT returns *conntrackFilter to delete the conntrack entries for connections -// specified by the destination IP (original direction) and source IP (reply direction). -func filterForNAT(origin, dest string, protocol v1.Protocol) *conntrackFilter { - klog.V(4).InfoS("Adding conntrack filter for cleanup", "org-dst", origin, "reply-src", dest, "protocol", protocol) +// filterForIPPortNAT returns *conntrackFilter to delete the conntrack entries for connections +// specified by the destination IP (original direction) and destination port (original direction) +// and source IP (reply direction). +func filterForIPPortNAT(origin, dest string, dstPort uint16, protocol v1.Protocol) *conntrackFilter { + klog.V(6).InfoS("Adding conntrack filter for cleanup", "org-dst", origin, "reply-src", dest, "protocol", protocol) return &conntrackFilter{ protocol: protocolMap[protocol], original: &connectionTuple{ - dstIP: netutils.ParseIPSloppy(origin), + dstIP: netutils.ParseIPSloppy(origin), + dstPort: dstPort, }, reply: &connectionTuple{ srcIP: netutils.ParseIPSloppy(dest), diff --git a/pkg/proxy/conntrack/cleanup_test.go b/pkg/proxy/conntrack/cleanup_test.go index d6232a02b3a..7d41f3ee7d3 100644 --- a/pkg/proxy/conntrack/cleanup_test.go +++ b/pkg/proxy/conntrack/cleanup_test.go @@ -21,9 +21,11 @@ package conntrack import ( "fmt" + "math/rand" "sort" "testing" + "github.com/google/go-cmp/cmp" "github.com/stretchr/testify/require" "github.com/vishvananda/netlink" "golang.org/x/sys/unix" @@ -50,8 +52,11 @@ const ( testNonServingEndpointIP = "10.240.1.5" testDeletedEndpointIP = "10.240.2.6" - testPort = 8000 - testNodePort = 32000 + testServicePort = 8000 + testServiceNodePort = 32000 + // testNonServicePort is used to mock conntrack flow entries which are not owned by + // kube-proxy and reconciler should not consider these for cleanup + testNonServicePort = 3000 ) func TestCleanStaleEntries(t *testing.T) { @@ -74,19 +79,19 @@ func TestCleanStaleEntries(t *testing.T) { Ports: []v1.ServicePort{ { Name: "test-tcp", - Port: testPort, + Port: testServicePort, Protocol: v1.ProtocolTCP, }, { Name: "test-udp", - Port: testPort, - NodePort: testNodePort, + Port: testServicePort, + NodePort: testServiceNodePort, Protocol: v1.ProtocolUDP, }, { Name: "test-sctp", - Port: testPort, - NodePort: testNodePort, + Port: testServicePort, + NodePort: testServiceNodePort, Protocol: v1.ProtocolSCTP, }, }, @@ -126,17 +131,17 @@ func TestCleanStaleEntries(t *testing.T) { Ports: []discovery.EndpointPort{ { Name: ptr.To("test-tcp"), - Port: ptr.To(int32(testPort)), + Port: ptr.To(int32(testServicePort)), Protocol: ptr.To(v1.ProtocolTCP), }, { Name: ptr.To("test-udp"), - Port: ptr.To(int32(testPort)), + Port: ptr.To(int32(testServicePort)), Protocol: ptr.To(v1.ProtocolUDP), }, { Name: ptr.To("test-sctp"), - Port: ptr.To(int32(testPort)), + Port: ptr.To(int32(testServicePort)), Protocol: ptr.To(v1.ProtocolSCTP), }, }, @@ -212,35 +217,47 @@ func TestCleanStaleEntries(t *testing.T) { } } - // mock existing entries before cleanup - // we create 36 fake flow entries ( 3 Endpoints * 3 Protocols * ( 3 (ServiceIPs) + 1 (NodePort)) - var mockEntries []*netlink.ConntrackFlow - // expectedEntries are the entries on which we will assert the cleanup logic - var expectedEntries []*netlink.ConntrackFlow + // The following mock conntrack flow entries `entriesBeforeCleanup` and `entriesAfterCleanup` + // represent conntrack flow entries before and after reconciler cleanup loop. Before cleanup, + // reconciler lists the conntrack flows, receiving `entriesBeforeCleanup` and after cleanup, + // we list the conntrack flows and assert them to match with `entriesAfterCleanup`. + // {entriesBeforeCleanup} - {entriesAfterCleanup} = entries cleared by conntrack reconciler + var entriesBeforeCleanup []*netlink.ConntrackFlow + // entriesBeforeCleanup - entriesAfterCleanup = entries cleared by conntrack reconciler + var entriesAfterCleanup []*netlink.ConntrackFlow + + // we create 63 fake flow entries ( 3 Endpoints * 3 Protocols * ( 3 (ServiceIP:ServicePort) + 3 (ServiceIP:NonServicePort) + 1 (NodePort)) for _, dnatDest := range []string{testServingEndpointIP, testNonServingEndpointIP, testDeletedEndpointIP} { for _, proto := range []uint8{unix.IPPROTO_TCP, unix.IPPROTO_UDP, unix.IPPROTO_SCTP} { + for _, origDest := range []string{testClusterIP, testLoadBalancerIP, testExternalIP} { - entry := &netlink.ConntrackFlow{ - FamilyType: unix.AF_INET, - Forward: netlink.IPTuple{ - DstIP: netutils.ParseIPSloppy(origDest), - Protocol: proto, - }, - Reverse: netlink.IPTuple{ - Protocol: proto, - SrcIP: netutils.ParseIPSloppy(dnatDest), - }, - } - mockEntries = append(mockEntries, entry) - // we do not expect deleted or non-serving UDP endpoints flows to be present after cleanup - if !(proto == unix.IPPROTO_UDP && (dnatDest == testNonServingEndpointIP || dnatDest == testDeletedEndpointIP)) { - expectedEntries = append(expectedEntries, entry) + for _, port := range []uint16{testServicePort, testNonServicePort} { + entry := &netlink.ConntrackFlow{ + FamilyType: unix.AF_INET, + Forward: netlink.IPTuple{ + DstIP: netutils.ParseIPSloppy(origDest), + DstPort: port, + Protocol: proto, + }, + Reverse: netlink.IPTuple{ + Protocol: proto, + SrcIP: netutils.ParseIPSloppy(dnatDest), + }, + } + entriesBeforeCleanup = append(entriesBeforeCleanup, entry) + if proto == unix.IPPROTO_UDP && port == testServicePort && dnatDest != testServingEndpointIP { + // we do not expect UDP entries with destination port `testServicePort` and DNATed destination + // address not an address of serving endpoint to be present after cleanup. + } else { + entriesAfterCleanup = append(entriesAfterCleanup, entry) + } } } + entry := &netlink.ConntrackFlow{ FamilyType: unix.AF_INET, Forward: netlink.IPTuple{ - DstPort: testNodePort, + DstPort: testServiceNodePort, Protocol: proto, }, Reverse: netlink.IPTuple{ @@ -248,50 +265,264 @@ func TestCleanStaleEntries(t *testing.T) { SrcIP: netutils.ParseIPSloppy(dnatDest), }, } - mockEntries = append(mockEntries, entry) - // we do not expect deleted or non-serving UDP endpoints entries to be present after cleanup - if !(proto == unix.IPPROTO_UDP && (dnatDest == testNonServingEndpointIP || dnatDest == testDeletedEndpointIP)) { - expectedEntries = append(expectedEntries, entry) + entriesBeforeCleanup = append(entriesBeforeCleanup, entry) + if proto == unix.IPPROTO_UDP && dnatDest != testServingEndpointIP { + // we do not expect UDP entries with DNATed destination address not + // an address of serving endpoint to be present after cleanup. + } else { + entriesAfterCleanup = append(entriesAfterCleanup, entry) } } } - // add some non-DNATed mock entries which should be cleared up by reconciler + // add 6 non-DNATed mock entries which should be cleared up by reconciler // These will exist if the proxy don't have DROP/REJECT rule for service with // no endpoints, --orig-dst and --reply-src will be same for these entries. for _, ip := range []string{testClusterIP, testLoadBalancerIP, testExternalIP} { - entry := &netlink.ConntrackFlow{ - FamilyType: unix.AF_INET, - Forward: netlink.IPTuple{ - DstIP: netutils.ParseIPSloppy(ip), - Protocol: unix.IPPROTO_UDP, - }, - Reverse: netlink.IPTuple{ - Protocol: unix.IPPROTO_UDP, - SrcIP: netutils.ParseIPSloppy(ip), - }, + for _, port := range []uint16{testServicePort, testNonServicePort} { + entry := &netlink.ConntrackFlow{ + FamilyType: unix.AF_INET, + Forward: netlink.IPTuple{ + DstIP: netutils.ParseIPSloppy(ip), + DstPort: port, + Protocol: unix.IPPROTO_UDP, + }, + Reverse: netlink.IPTuple{ + Protocol: unix.IPPROTO_UDP, + SrcIP: netutils.ParseIPSloppy(ip), + }, + } + + entriesBeforeCleanup = append(entriesBeforeCleanup, entry) + // we do not expect entries with destination port `testServicePort` to be + // present after cleanup. + if port != testServicePort { + entriesAfterCleanup = append(entriesAfterCleanup, entry) + } + } - mockEntries = append(mockEntries, entry) } + t.Logf("entries before cleanup %d after cleanup %d", len(entriesBeforeCleanup), len(entriesAfterCleanup)) fake := NewFake() - fake.entries = mockEntries + fake.entries = entriesBeforeCleanup CleanStaleEntries(fake, testIPFamily, svcPortMap, endpointsMap) actualEntries, _ := fake.ListEntries(ipFamilyMap[testIPFamily]) - require.Equal(t, len(expectedEntries), len(actualEntries)) + require.Equal(t, len(entriesAfterCleanup), len(actualEntries)) // sort the actual flows before comparison sort.Slice(actualEntries, func(i, j int) bool { return actualEntries[i].String() < actualEntries[j].String() }) // sort the expected flows before comparison - sort.Slice(expectedEntries, func(i, j int) bool { - return expectedEntries[i].String() < expectedEntries[j].String() + sort.Slice(entriesAfterCleanup, func(i, j int) bool { + return entriesAfterCleanup[i].String() < entriesAfterCleanup[j].String() }) - for i := 0; i < len(expectedEntries); i++ { - require.Equal(t, expectedEntries[i], actualEntries[i]) + if diff := cmp.Diff(entriesAfterCleanup, actualEntries); len(diff) > 0 { + t.Errorf("unexpected entries after cleanup: %s", diff) + } +} + +func TestPerformanceCleanStaleEntries(t *testing.T) { + sct := proxy.NewServiceChangeTracker(nil, v1.IPv4Protocol, nil, nil) + svc := &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: testServiceName, + Namespace: testServiceNamespace, + }, + Spec: v1.ServiceSpec{ + ClusterIP: testClusterIP, + ExternalIPs: []string{testExternalIP}, + Ports: []v1.ServicePort{ + { + Name: "test-udp", + Port: testServicePort, + Protocol: v1.ProtocolUDP, + }, + }, + }, + Status: v1.ServiceStatus{ + LoadBalancer: v1.LoadBalancerStatus{ + Ingress: []v1.LoadBalancerIngress{{ + IP: testLoadBalancerIP, + }}, + }, + }, + } + + sct.Update(nil, svc) + svcPortMap := make(proxy.ServicePortMap) + _ = svcPortMap.Update(sct) + + ect := proxy.NewEndpointsChangeTracker("test-worker", nil, v1.IPv4Protocol, nil, nil) + eps := &discovery.EndpointSlice{ + TypeMeta: metav1.TypeMeta{}, + AddressType: discovery.AddressTypeIPv4, + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-0", testServiceName), + Namespace: testServiceNamespace, + Labels: map[string]string{discovery.LabelServiceName: testServiceName}, + }, + Endpoints: []discovery.Endpoint{ + { + Addresses: []string{testServingEndpointIP}, + Conditions: discovery.EndpointConditions{Serving: ptr.To(true)}, + }, + { + Addresses: []string{testNonServingEndpointIP}, + Conditions: discovery.EndpointConditions{Serving: ptr.To(false)}, + }, + }, + Ports: []discovery.EndpointPort{ + { + Name: ptr.To("test-udp"), + Port: ptr.To(int32(testServicePort)), + Protocol: ptr.To(v1.ProtocolUDP), + }, + }, + } + + ect.EndpointSliceUpdate(eps, false) + endpointsMap := make(proxy.EndpointsMap) + _ = endpointsMap.Update(ect) + + genConntrackEntry := func(dstPort uint16, revSrc string) *netlink.ConntrackFlow { + return &netlink.ConntrackFlow{ + FamilyType: unix.AF_INET, + Forward: netlink.IPTuple{ + DstIP: netutils.ParseIPSloppy(testExternalIP), + DstPort: dstPort, + Protocol: unix.IPPROTO_UDP, + }, + Reverse: netlink.IPTuple{ + Protocol: unix.IPPROTO_UDP, + SrcIP: netutils.ParseIPSloppy(revSrc), + }, + } + } + + fake := NewFake() + // 1 valid entry + fake.entries = append(fake.entries, genConntrackEntry(uint16(testServicePort), testServingEndpointIP)) + expectedEntries := 1 + // 1 stale entry + fake.entries = append(fake.entries, genConntrackEntry(uint16(testServicePort), testDeletedEndpointIP)) + expectedDeleted := 1 + // 1 M to the Service IP with random ports + for i := 0; i < 1000*1000; i++ { + port := uint16(rand.Intn(65535)) + if port == testServicePort { + expectedDeleted++ + } else { + expectedEntries++ + } + fake.entries = append(fake.entries, genConntrackEntry(port, testDeletedEndpointIP)) + } + + CleanStaleEntries(fake, testIPFamily, svcPortMap, endpointsMap) + actualEntries, _ := fake.ListEntries(ipFamilyMap[testIPFamily]) + if len(actualEntries) != expectedEntries { + t.Errorf("unexpected number of entries, got %d expected %d", len(actualEntries), expectedEntries) + } + // expected conntrack entries + // 1 for CleanStaleEntries + 1 for ListEntries + 1 for ConntrackDeleteFilters to dump the conntrack table + // n for the expected deleted stale entries + t.Logf("expected deleted %d", expectedDeleted) + if fake.netlinkRequests != 3+expectedDeleted { + t.Errorf("expected %d netlink requests, got %d", 3+expectedDeleted, fake.netlinkRequests) + } +} + +func TestServiceWithoutEndpoints(t *testing.T) { + sct := proxy.NewServiceChangeTracker(nil, v1.IPv4Protocol, nil, nil) + svc := &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: testServiceName, + Namespace: testServiceNamespace, + }, + Spec: v1.ServiceSpec{ + ClusterIP: testClusterIP, + ExternalIPs: []string{testExternalIP}, + Ports: []v1.ServicePort{ + { + Name: "test-udp", + Port: testServicePort, + Protocol: v1.ProtocolUDP, + }, + }, + }, + Status: v1.ServiceStatus{ + LoadBalancer: v1.LoadBalancerStatus{ + Ingress: []v1.LoadBalancerIngress{{ + IP: testLoadBalancerIP, + }}, + }, + }, + } + + sct.Update(nil, svc) + svcPortMap := make(proxy.ServicePortMap) + _ = svcPortMap.Update(sct) + + ect := proxy.NewEndpointsChangeTracker("test-worker", nil, v1.IPv4Protocol, nil, nil) + eps := &discovery.EndpointSlice{ + TypeMeta: metav1.TypeMeta{}, + AddressType: discovery.AddressTypeIPv4, + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-0", testServiceName), + Namespace: testServiceNamespace, + Labels: map[string]string{discovery.LabelServiceName: "non-existing-service"}, + }, + Endpoints: []discovery.Endpoint{ + { + Addresses: []string{testServingEndpointIP}, + Conditions: discovery.EndpointConditions{Serving: ptr.To(true)}, + }, + { + Addresses: []string{testNonServingEndpointIP}, + Conditions: discovery.EndpointConditions{Serving: ptr.To(false)}, + }, + }, + Ports: []discovery.EndpointPort{ + { + Name: ptr.To("test-udp"), + Port: ptr.To(int32(testServicePort)), + Protocol: ptr.To(v1.ProtocolUDP), + }, + }, + } + + ect.EndpointSliceUpdate(eps, false) + endpointsMap := make(proxy.EndpointsMap) + _ = endpointsMap.Update(ect) + + genConntrackEntry := func(dstPort uint16, revSrc string) *netlink.ConntrackFlow { + return &netlink.ConntrackFlow{ + FamilyType: unix.AF_INET, + Forward: netlink.IPTuple{ + DstIP: netutils.ParseIPSloppy(testExternalIP), + DstPort: dstPort, + Protocol: unix.IPPROTO_UDP, + }, + Reverse: netlink.IPTuple{ + Protocol: unix.IPPROTO_UDP, + SrcIP: netutils.ParseIPSloppy(revSrc), + }, + } + } + + fake := NewFake() + // 1 valid entry + fake.entries = append(fake.entries, genConntrackEntry(uint16(testServicePort), testServingEndpointIP)) + // 1 stale entry + fake.entries = append(fake.entries, genConntrackEntry(uint16(testServicePort), testDeletedEndpointIP)) + + CleanStaleEntries(fake, testIPFamily, svcPortMap, endpointsMap) + actualEntries, _ := fake.ListEntries(ipFamilyMap[testIPFamily]) + if len(actualEntries) != 2 { + t.Errorf("unexpected number of entries, got %d expected %d", len(actualEntries), 2) } } @@ -300,6 +531,7 @@ func TestFilterForNAT(t *testing.T) { name string orig string dest string + dstPort uint16 protocol v1.Protocol expectedFilter *conntrackFilter }{ @@ -329,7 +561,7 @@ func TestFilterForNAT(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - require.Equal(t, tc.expectedFilter, filterForNAT(tc.orig, tc.dest, tc.protocol)) + require.Equal(t, tc.expectedFilter, filterForIPPortNAT(tc.orig, tc.dest, tc.dstPort, tc.protocol)) }) } } diff --git a/pkg/proxy/conntrack/fake.go b/pkg/proxy/conntrack/fake.go index bedbb99b6e9..8902da7a6f2 100644 --- a/pkg/proxy/conntrack/fake.go +++ b/pkg/proxy/conntrack/fake.go @@ -25,7 +25,8 @@ import ( // FakeInterface implements Interface by just recording entries that have been cleared. type FakeInterface struct { - entries []*netlink.ConntrackFlow + entries []*netlink.ConntrackFlow + netlinkRequests int // try to get the estimated number of netlink request } var _ Interface = &FakeInterface{} @@ -37,18 +38,30 @@ func NewFake() *FakeInterface { // ListEntries is part of Interface func (fake *FakeInterface) ListEntries(_ uint8) ([]*netlink.ConntrackFlow, error) { - return fake.entries, nil + entries := make([]*netlink.ConntrackFlow, len(fake.entries)) + copy(entries, fake.entries) + // 1 netlink request to dump the table + // https://github.com/vishvananda/netlink/blob/0af32151e72b990c271ef6268e8aadb7e015f2bd/conntrack_linux.go#L93-L94 + fake.netlinkRequests++ + return entries, nil } // ClearEntries is part of Interface func (fake *FakeInterface) ClearEntries(_ uint8, filters ...netlink.CustomConntrackFilter) (int, error) { var flows []*netlink.ConntrackFlow before := len(fake.entries) + // 1 netlink request to dump the table + // https://github.com/vishvananda/netlink/blob/0af32151e72b990c271ef6268e8aadb7e015f2bd/conntrack_linux.go#L163 + fake.netlinkRequests++ + for _, flow := range fake.entries { var matched bool for _, filter := range filters { matched = filter.MatchConntrackFlow(flow) if matched { + // 1 netlink request to delete the flow + // https://github.com/vishvananda/netlink/blob/0af32151e72b990c271ef6268e8aadb7e015f2bd/conntrack_linux.go#L182 + fake.netlinkRequests++ break } }