From 1ad8880c0fec8b208cb3678bc67628b6c5fc4f8a Mon Sep 17 00:00:00 2001 From: Daman Arora Date: Wed, 11 Sep 2024 14:57:57 +0530 Subject: [PATCH] proxy/conntrack: reconciler Signed-off-by: Daman Arora --- pkg/proxy/conntrack/cleanup.go | 162 +++++------ pkg/proxy/conntrack/cleanup_test.go | 402 ++++++++++++-------------- pkg/proxy/conntrack/conntrack.go | 10 +- pkg/proxy/conntrack/conntrack_test.go | 5 +- pkg/proxy/conntrack/fake.go | 69 ++--- pkg/proxy/iptables/proxier.go | 2 +- pkg/proxy/ipvs/proxier.go | 4 +- pkg/proxy/nftables/proxier.go | 2 +- 8 files changed, 287 insertions(+), 369 deletions(-) diff --git a/pkg/proxy/conntrack/cleanup.go b/pkg/proxy/conntrack/cleanup.go index 93f4ccdc585..e23cada043a 100644 --- a/pkg/proxy/conntrack/cleanup.go +++ b/pkg/proxy/conntrack/cleanup.go @@ -20,6 +20,8 @@ limitations under the License. package conntrack import ( + "time" + "github.com/vishvananda/netlink" "golang.org/x/sys/unix" @@ -27,88 +29,96 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/proxy" - proxyutil "k8s.io/kubernetes/pkg/proxy/util" netutils "k8s.io/utils/net" ) -// CleanStaleEntries takes care of flushing stale conntrack entries for services and endpoints. -func CleanStaleEntries(ct Interface, ipFamily v1.IPFamily, svcPortMap proxy.ServicePortMap, - serviceUpdateResult proxy.UpdateServiceMapResult, endpointsUpdateResult proxy.UpdateEndpointsMapResult) { - deleteStaleServiceConntrackEntries(ct, ipFamily, svcPortMap, serviceUpdateResult, endpointsUpdateResult) - deleteStaleEndpointConntrackEntries(ct, ipFamily, svcPortMap, endpointsUpdateResult) -} +// CleanStaleEntries scans conntrack table and removes any entries +// for a service that do not correspond to a serving endpoint. +func CleanStaleEntries(ct Interface, ipFamily v1.IPFamily, + svcPortMap proxy.ServicePortMap, endpointsMap proxy.EndpointsMap) { -// deleteStaleServiceConntrackEntries takes care of flushing stale conntrack entries related -// to UDP Service IPs. When a service has no endpoints and we drop traffic to it, conntrack -// may create "black hole" entries for that IP+port. When the service gets endpoints we -// need to delete those entries so further traffic doesn't get dropped. -func deleteStaleServiceConntrackEntries(ct Interface, ipFamily v1.IPFamily, svcPortMap proxy.ServicePortMap, serviceUpdateResult proxy.UpdateServiceMapResult, endpointsUpdateResult proxy.UpdateEndpointsMapResult) { - var filters []netlink.CustomConntrackFilter - conntrackCleanupServiceIPs := serviceUpdateResult.DeletedUDPClusterIPs - conntrackCleanupServiceNodePorts := sets.New[int]() + start := time.Now() + klog.V(4).InfoS("Started to reconcile conntrack entries", "ipFamily", ipFamily) - // merge newly active services gathered from endpointsUpdateResult - // a UDP service that changes from 0 to non-0 endpoints is newly active. - for _, svcPortName := range endpointsUpdateResult.NewlyActiveUDPServices { - if svcInfo, ok := svcPortMap[svcPortName]; ok { - klog.V(4).InfoS("Newly-active UDP service may have stale conntrack entries", "servicePortName", svcPortName) - conntrackCleanupServiceIPs.Insert(svcInfo.ClusterIP().String()) - for _, extIP := range svcInfo.ExternalIPs() { - conntrackCleanupServiceIPs.Insert(extIP.String()) - } - for _, lbIP := range svcInfo.LoadBalancerVIPs() { - conntrackCleanupServiceIPs.Insert(lbIP.String()) - } - nodePort := svcInfo.NodePort() - if svcInfo.Protocol() == v1.ProtocolUDP && nodePort != 0 { - conntrackCleanupServiceNodePorts.Insert(nodePort) + entries, err := ct.ListEntries(ipFamilyMap[ipFamily]) + if err != nil { + klog.ErrorS(err, "Failed to list conntrack entries") + return + } + + // serviceIPEndpointIPs maps service IPs (ClusterIP, LoadBalancerIPs and ExternalIPs) + // to the set of serving endpoint IPs. + serviceIPEndpointIPs := make(map[string]sets.Set[string]) + // serviceNodePortEndpointIPs maps service NodePort to the set of serving endpoint IPs. + serviceNodePortEndpointIPs := make(map[int]sets.Set[string]) + + for svcName, svc := range svcPortMap { + // we are only interested in UDP services + if svc.Protocol() != v1.ProtocolUDP { + continue + } + + endpointIPs := sets.New[string]() + for _, endpoint := range endpointsMap[svcName] { + // We need to remove all the conntrack entries for a Service (IP or NodePort) + // that are not pointing to a serving endpoint. + // We map all the serving endpoint IPs to the service and clear all the conntrack + // entries which are destined for the service and are not DNATed to these endpoints. + // Changes to the service should not affect existing flows, so we do not take + // traffic policies, topology, or terminating status of the service into account. + // This ensures that the behavior of UDP services remains consistent with TCP + // services. + if endpoint.IsServing() { + endpointIPs.Insert(endpoint.IP()) } } + + serviceIPEndpointIPs[svc.ClusterIP().String()] = endpointIPs + for _, loadBalancerIP := range svc.LoadBalancerVIPs() { + serviceIPEndpointIPs[loadBalancerIP.String()] = endpointIPs + } + for _, externalIP := range svc.ExternalIPs() { + serviceIPEndpointIPs[externalIP.String()] = endpointIPs + } + if svc.NodePort() != 0 { + serviceNodePortEndpointIPs[svc.NodePort()] = endpointIPs + } } - klog.V(4).InfoS("Deleting conntrack stale entries for services", "IPs", conntrackCleanupServiceIPs.UnsortedList()) - for _, svcIP := range conntrackCleanupServiceIPs.UnsortedList() { - filters = append(filters, filterForIP(svcIP, v1.ProtocolUDP)) - } - klog.V(4).InfoS("Deleting conntrack stale entries for services", "nodePorts", conntrackCleanupServiceNodePorts.UnsortedList()) - for _, nodePort := range conntrackCleanupServiceNodePorts.UnsortedList() { - filters = append(filters, filterForPort(nodePort, v1.ProtocolUDP)) - } - - if n, err := ct.ClearEntries(ipFamilyMap[ipFamily], filters...); err != nil { - klog.ErrorS(err, "Failed to delete stale service connections") - } else { - klog.V(4).InfoS("Deleted conntrack stale entries for services", "count", n) - } -} - -// deleteStaleEndpointConntrackEntries takes care of flushing stale conntrack entries related -// to UDP endpoints. After a UDP endpoint is removed we must flush any conntrack entries -// for it so that if the same client keeps sending, the packets will get routed to a new endpoint. -func deleteStaleEndpointConntrackEntries(ct Interface, ipFamily v1.IPFamily, svcPortMap proxy.ServicePortMap, endpointsUpdateResult proxy.UpdateEndpointsMapResult) { var filters []netlink.CustomConntrackFilter - for _, epSvcPair := range endpointsUpdateResult.DeletedUDPEndpoints { - if svcInfo, ok := svcPortMap[epSvcPair.ServicePortName]; ok { - endpointIP := proxyutil.IPPart(epSvcPair.Endpoint) - nodePort := svcInfo.NodePort() - if nodePort != 0 { - filters = append(filters, filterForPortNAT(endpointIP, nodePort, v1.ProtocolUDP)) + for _, entry := range entries { + // we only deal with UDP protocol entries + if entry.Forward.Protocol != unix.IPPROTO_UDP { + continue + } + origDst := entry.Forward.DstIP.String() + origPortDst := int(entry.Forward.DstPort) + replySrc := entry.Reverse.SrcIP.String() + + // if the original destination (--orig-dst) of the entry is service IP (ClusterIP, + // LoadBalancerIPs or ExternalIPs) and the reply source (--reply-src) is not IP of + // any serving endpoint, we clear the entry. + if _, ok := serviceIPEndpointIPs[origDst]; ok { + if !serviceIPEndpointIPs[origDst].Has(replySrc) { + filters = append(filters, filterForNAT(origDst, replySrc, v1.ProtocolUDP)) } - filters = append(filters, filterForNAT(svcInfo.ClusterIP().String(), endpointIP, v1.ProtocolUDP)) - for _, extIP := range svcInfo.ExternalIPs() { - filters = append(filters, filterForNAT(extIP.String(), endpointIP, v1.ProtocolUDP)) - } - for _, lbIP := range svcInfo.LoadBalancerVIPs() { - filters = append(filters, filterForNAT(lbIP.String(), endpointIP, v1.ProtocolUDP)) + } + + // if the original port destination (--orig-port-dst) of the flow is service + // NodePort and the reply source (--reply-src) is not IP of any serving endpoint, + // we clear the entry. + if _, ok := serviceNodePortEndpointIPs[origPortDst]; ok { + if !serviceNodePortEndpointIPs[origPortDst].Has(replySrc) { + filters = append(filters, filterForPortNAT(replySrc, origPortDst, v1.ProtocolUDP)) } } } if n, err := ct.ClearEntries(ipFamilyMap[ipFamily], filters...); err != nil { - klog.ErrorS(err, "Failed to delete stale endpoint connections") + klog.ErrorS(err, "Failed to clear all conntrack entries", "ipFamily", ipFamily, "entriesDeleted", n, "took", time.Since(start)) } else { - klog.V(4).InfoS("Deleted conntrack stale entries for endpoints", "count", n) + klog.V(4).InfoS("Finished reconciling conntrack entries", "ipFamily", ipFamily, "entriesDeleted", n, "took", time.Since(start)) } } @@ -126,30 +136,6 @@ var protocolMap = map[v1.Protocol]uint8{ v1.ProtocolSCTP: unix.IPPROTO_SCTP, } -// filterForIP returns *conntrackFilter to delete the conntrack entries for connections -// specified by the destination IP (original direction). -func filterForIP(ip string, protocol v1.Protocol) *conntrackFilter { - klog.V(4).InfoS("Adding conntrack filter for cleanup", "org-dst", ip, "protocol", protocol) - return &conntrackFilter{ - protocol: protocolMap[protocol], - original: &connectionTuple{ - dstIP: netutils.ParseIPSloppy(ip), - }, - } -} - -// filterForPort returns *conntrackFilter to delete the conntrack entries for connections -// specified by the destination Port (original direction). -func filterForPort(port int, protocol v1.Protocol) *conntrackFilter { - klog.V(4).InfoS("Adding conntrack filter for cleanup", "org-port-dst", port, "protocol", protocol) - return &conntrackFilter{ - protocol: protocolMap[protocol], - original: &connectionTuple{ - dstPort: uint16(port), - }, - } -} - // filterForNAT returns *conntrackFilter to delete the conntrack entries for connections // specified by the destination IP (original direction) and source IP (reply direction). func filterForNAT(origin, dest string, protocol v1.Protocol) *conntrackFilter { diff --git a/pkg/proxy/conntrack/cleanup_test.go b/pkg/proxy/conntrack/cleanup_test.go index 134f4d724d1..d6232a02b3a 100644 --- a/pkg/proxy/conntrack/cleanup_test.go +++ b/pkg/proxy/conntrack/cleanup_test.go @@ -20,62 +20,75 @@ limitations under the License. package conntrack import ( - "net" - "reflect" + "fmt" + "sort" "testing" "github.com/stretchr/testify/require" "github.com/vishvananda/netlink" + "golang.org/x/sys/unix" v1 "k8s.io/api/core/v1" + discovery "k8s.io/api/discovery/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/sets" "k8s.io/kubernetes/pkg/proxy" netutils "k8s.io/utils/net" + "k8s.io/utils/ptr" ) const ( + testServiceName = "cleanup-test" + testServiceNamespace = "test" + testIPFamily = v1.IPv4Protocol testClusterIP = "172.30.1.1" testExternalIP = "192.168.99.100" testLoadBalancerIP = "1.2.3.4" - testEndpointIP = "10.240.0.4" + testServingEndpointIP = "10.240.0.4" + testNonServingEndpointIP = "10.240.1.5" + testDeletedEndpointIP = "10.240.2.6" - testPort = 53 - testNodePort = 5353 - testEndpointPort = "5300" + testPort = 8000 + testNodePort = 32000 ) func TestCleanStaleEntries(t *testing.T) { - // We need to construct a proxy.ServicePortMap to pass to CleanStaleEntries. - // ServicePortMap is just map[string]proxy.ServicePort, but there are no public - // constructors for any implementation of proxy.ServicePort, so we have to either - // provide our own implementation of that interface, or else use a - // proxy.ServiceChangeTracker to construct them and fill in the map for us. + // We need to construct proxy.ServicePortMap and proxy.EndpointsMap to pass to + // CleanStaleEntries. ServicePortMap and EndpointsMap are just maps, but there are + // no public constructors for any implementation of proxy.ServicePort and + // proxy.EndpointsMap, so we have to either provide our own implementation of that + // interface, or else use a proxy.ServiceChangeTracker and proxy.NewEndpointsChangeTracker + // to construct them and fill in the maps for us. sct := proxy.NewServiceChangeTracker(nil, v1.IPv4Protocol, nil, nil) svc := &v1.Service{ ObjectMeta: metav1.ObjectMeta{ - Name: "cleanup-test", - Namespace: "test", + Name: testServiceName, + Namespace: testServiceNamespace, }, Spec: v1.ServiceSpec{ ClusterIP: testClusterIP, ExternalIPs: []string{testExternalIP}, Ports: []v1.ServicePort{ { - Name: "dns-tcp", + Name: "test-tcp", Port: testPort, Protocol: v1.ProtocolTCP, }, { - Name: "dns-udp", + Name: "test-udp", Port: testPort, NodePort: testNodePort, Protocol: v1.ProtocolUDP, }, + { + Name: "test-sctp", + Port: testPort, + NodePort: testNodePort, + Protocol: v1.ProtocolSCTP, + }, }, }, Status: v1.ServiceStatus{ @@ -86,14 +99,52 @@ func TestCleanStaleEntries(t *testing.T) { }, }, } - sct.Update(nil, svc) + sct.Update(nil, svc) svcPortMap := make(proxy.ServicePortMap) _ = svcPortMap.Update(sct) - // (At this point we are done with sct, and in particular, we don't use sct to - // construct UpdateServiceMapResults, because pkg/proxy already has its own tests - // for that. Also, svcPortMap is read-only from this point on.) + ect := proxy.NewEndpointsChangeTracker("test-worker", nil, v1.IPv4Protocol, nil, nil) + eps := &discovery.EndpointSlice{ + TypeMeta: metav1.TypeMeta{}, + AddressType: discovery.AddressTypeIPv4, + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-0", testServiceName), + Namespace: testServiceNamespace, + Labels: map[string]string{discovery.LabelServiceName: testServiceName}, + }, + Endpoints: []discovery.Endpoint{ + { + Addresses: []string{testServingEndpointIP}, + Conditions: discovery.EndpointConditions{Serving: ptr.To(true)}, + }, + { + Addresses: []string{testNonServingEndpointIP}, + Conditions: discovery.EndpointConditions{Serving: ptr.To(false)}, + }, + }, + Ports: []discovery.EndpointPort{ + { + Name: ptr.To("test-tcp"), + Port: ptr.To(int32(testPort)), + Protocol: ptr.To(v1.ProtocolTCP), + }, + { + Name: ptr.To("test-udp"), + Port: ptr.To(int32(testPort)), + Protocol: ptr.To(v1.ProtocolUDP), + }, + { + Name: ptr.To("test-sctp"), + Port: ptr.To(int32(testPort)), + Protocol: ptr.To(v1.ProtocolSCTP), + }, + }, + } + + ect.EndpointSliceUpdate(eps, false) + endpointsMap := make(proxy.EndpointsMap) + _ = endpointsMap.Update(ect) tcpPortName := proxy.ServicePortName{ NamespacedName: types.NamespacedName{ @@ -113,227 +164,134 @@ func TestCleanStaleEntries(t *testing.T) { Protocol: svc.Spec.Ports[1].Protocol, } - unknownPortName := udpPortName - unknownPortName.Namespace = "unknown" + sctpPortName := proxy.ServicePortName{ + NamespacedName: types.NamespacedName{ + Namespace: svc.Namespace, + Name: svc.Name, + }, + Port: svc.Spec.Ports[2].Name, + Protocol: svc.Spec.Ports[2].Protocol, + } - // Sanity-check to make sure we constructed the map correctly - if len(svcPortMap) != 2 { + // Sanity-check to make sure we constructed the ServicePortMap correctly + if len(svcPortMap) != 3 { t.Fatalf("expected svcPortMap to have 2 entries, got %+v", svcPortMap) } servicePort := svcPortMap[tcpPortName] - if servicePort == nil || servicePort.String() != "172.30.1.1:53/TCP" { - t.Fatalf("expected svcPortMap[%q] to be \"172.30.1.1:53/TCP\", got %q", tcpPortName.String(), servicePort.String()) + if servicePort == nil || servicePort.String() != "172.30.1.1:8000/TCP" { + t.Fatalf("expected svcPortMap[%q] to be \"172.30.1.1:8000/TCP\", got %q", tcpPortName.String(), servicePort.String()) } servicePort = svcPortMap[udpPortName] - if servicePort == nil || servicePort.String() != "172.30.1.1:53/UDP" { - t.Fatalf("expected svcPortMap[%q] to be \"172.30.1.1:53/UDP\", got %q", udpPortName.String(), servicePort.String()) + if servicePort == nil || servicePort.String() != "172.30.1.1:8000/UDP" { + t.Fatalf("expected svcPortMap[%q] to be \"172.30.1.1:8000/UDP\", got %q", udpPortName.String(), servicePort.String()) + } + servicePort = svcPortMap[sctpPortName] + if servicePort == nil || servicePort.String() != "172.30.1.1:8000/SCTP" { + t.Fatalf("expected svcPortMap[%q] to be \"172.30.1.1:8000/SCTP\", got %q", sctpPortName.String(), servicePort.String()) } - testCases := []struct { - description string - - serviceUpdates proxy.UpdateServiceMapResult - endpointsUpdates proxy.UpdateEndpointsMapResult - - result FakeInterface - }{ - { - description: "DeletedUDPClusterIPs clears entries for given clusterIPs (only)", - - serviceUpdates: proxy.UpdateServiceMapResult{ - // Note: this isn't testClusterIP; it's the IP of some - // unknown (because deleted) service. - DeletedUDPClusterIPs: sets.New("172.30.99.99"), - }, - endpointsUpdates: proxy.UpdateEndpointsMapResult{}, - - result: FakeInterface{ - ClearedIPs: sets.New("172.30.99.99"), - - ClearedPorts: sets.New[int](), - ClearedNATs: map[string]string{}, - ClearedPortNATs: map[int]string{}, - }, - }, - { - description: "DeletedUDPEndpoints clears NAT entries for all IPs and NodePorts", - - serviceUpdates: proxy.UpdateServiceMapResult{ - DeletedUDPClusterIPs: sets.New[string](), - }, - endpointsUpdates: proxy.UpdateEndpointsMapResult{ - DeletedUDPEndpoints: []proxy.ServiceEndpoint{{ - Endpoint: net.JoinHostPort(testEndpointIP, testEndpointPort), - ServicePortName: udpPortName, - }}, - }, - - result: FakeInterface{ - ClearedIPs: sets.New[string](), - ClearedPorts: sets.New[int](), - - ClearedNATs: map[string]string{ - testClusterIP: testEndpointIP, - testExternalIP: testEndpointIP, - testLoadBalancerIP: testEndpointIP, - }, - ClearedPortNATs: map[int]string{ - testNodePort: testEndpointIP, - }, - }, - }, - { - description: "NewlyActiveUDPServices clears entries for all IPs and NodePorts", - - serviceUpdates: proxy.UpdateServiceMapResult{ - DeletedUDPClusterIPs: sets.New[string](), - }, - endpointsUpdates: proxy.UpdateEndpointsMapResult{ - DeletedUDPEndpoints: []proxy.ServiceEndpoint{}, - NewlyActiveUDPServices: []proxy.ServicePortName{ - udpPortName, - }, - }, - - result: FakeInterface{ - ClearedIPs: sets.New(testClusterIP, testExternalIP, testLoadBalancerIP), - ClearedPorts: sets.New(testNodePort), - - ClearedNATs: map[string]string{}, - ClearedPortNATs: map[int]string{}, - }, - }, - - { - description: "DeletedUDPEndpoints for unknown Service has no effect", - - serviceUpdates: proxy.UpdateServiceMapResult{ - DeletedUDPClusterIPs: sets.New[string](), - }, - endpointsUpdates: proxy.UpdateEndpointsMapResult{ - DeletedUDPEndpoints: []proxy.ServiceEndpoint{{ - Endpoint: "10.240.0.4:80", - ServicePortName: unknownPortName, - }}, - NewlyActiveUDPServices: []proxy.ServicePortName{}, - }, - - result: FakeInterface{ - ClearedIPs: sets.New[string](), - ClearedPorts: sets.New[int](), - ClearedNATs: map[string]string{}, - ClearedPortNATs: map[int]string{}, - }, - }, - { - description: "NewlyActiveUDPServices for unknown Service has no effect", - - serviceUpdates: proxy.UpdateServiceMapResult{ - DeletedUDPClusterIPs: sets.New[string](), - }, - endpointsUpdates: proxy.UpdateEndpointsMapResult{ - DeletedUDPEndpoints: []proxy.ServiceEndpoint{}, - NewlyActiveUDPServices: []proxy.ServicePortName{ - unknownPortName, - }, - }, - - result: FakeInterface{ - ClearedIPs: sets.New[string](), - ClearedPorts: sets.New[int](), - ClearedNATs: map[string]string{}, - ClearedPortNATs: map[int]string{}, - }, - }, + // Sanity-check to make sure we constructed the EndpointsMap map correctly + if len(endpointsMap) != 3 { + t.Fatalf("expected endpointsMap to have 3 entries, got %+v", endpointsMap) + } + for _, svcPortName := range []proxy.ServicePortName{tcpPortName, udpPortName, sctpPortName} { + if len(endpointsMap[svcPortName]) != 2 { + t.Fatalf("expected endpointsMap[%q] to have 2 entries, got %+v", svcPortName.String(), endpointsMap[svcPortName]) + } + if endpointsMap[svcPortName][0].IP() != "10.240.0.4" { + t.Fatalf("expected endpointsMap[%q][0] IP to be \"10.240.0.4\", got \"%s\"", svcPortName.String(), endpointsMap[svcPortName][0].IP()) + } + if endpointsMap[svcPortName][1].IP() != "10.240.1.5" { + t.Fatalf("expected endpointsMap[%q][1] IP to be \"10.240.1.5\", got \"%s\"", svcPortName.String(), endpointsMap[svcPortName][1].IP()) + } + if !endpointsMap[svcPortName][0].IsServing() { + t.Fatalf("expected endpointsMap[%q][0] to be serving", svcPortName.String()) + } + if endpointsMap[svcPortName][1].IsServing() { + t.Fatalf("expected endpointsMap[%q][1] to be not serving", svcPortName.String()) + } } - for _, tc := range testCases { - t.Run(tc.description, func(t *testing.T) { - fake := NewFake() - CleanStaleEntries(fake, testIPFamily, svcPortMap, tc.serviceUpdates, tc.endpointsUpdates) - if !fake.ClearedIPs.Equal(tc.result.ClearedIPs) { - t.Errorf("Expected ClearedIPs=%v, got %v", tc.result.ClearedIPs, fake.ClearedIPs) + // mock existing entries before cleanup + // we create 36 fake flow entries ( 3 Endpoints * 3 Protocols * ( 3 (ServiceIPs) + 1 (NodePort)) + var mockEntries []*netlink.ConntrackFlow + // expectedEntries are the entries on which we will assert the cleanup logic + var expectedEntries []*netlink.ConntrackFlow + for _, dnatDest := range []string{testServingEndpointIP, testNonServingEndpointIP, testDeletedEndpointIP} { + for _, proto := range []uint8{unix.IPPROTO_TCP, unix.IPPROTO_UDP, unix.IPPROTO_SCTP} { + for _, origDest := range []string{testClusterIP, testLoadBalancerIP, testExternalIP} { + entry := &netlink.ConntrackFlow{ + FamilyType: unix.AF_INET, + Forward: netlink.IPTuple{ + DstIP: netutils.ParseIPSloppy(origDest), + Protocol: proto, + }, + Reverse: netlink.IPTuple{ + Protocol: proto, + SrcIP: netutils.ParseIPSloppy(dnatDest), + }, + } + mockEntries = append(mockEntries, entry) + // we do not expect deleted or non-serving UDP endpoints flows to be present after cleanup + if !(proto == unix.IPPROTO_UDP && (dnatDest == testNonServingEndpointIP || dnatDest == testDeletedEndpointIP)) { + expectedEntries = append(expectedEntries, entry) + } } - if !fake.ClearedPorts.Equal(tc.result.ClearedPorts) { - t.Errorf("Expected ClearedPorts=%v, got %v", tc.result.ClearedPorts, fake.ClearedPorts) + entry := &netlink.ConntrackFlow{ + FamilyType: unix.AF_INET, + Forward: netlink.IPTuple{ + DstPort: testNodePort, + Protocol: proto, + }, + Reverse: netlink.IPTuple{ + Protocol: proto, + SrcIP: netutils.ParseIPSloppy(dnatDest), + }, } - if !reflect.DeepEqual(fake.ClearedNATs, tc.result.ClearedNATs) { - t.Errorf("Expected ClearedNATs=%v, got %v", tc.result.ClearedNATs, fake.ClearedNATs) + mockEntries = append(mockEntries, entry) + // we do not expect deleted or non-serving UDP endpoints entries to be present after cleanup + if !(proto == unix.IPPROTO_UDP && (dnatDest == testNonServingEndpointIP || dnatDest == testDeletedEndpointIP)) { + expectedEntries = append(expectedEntries, entry) } - if !reflect.DeepEqual(fake.ClearedPortNATs, tc.result.ClearedPortNATs) { - t.Errorf("Expected ClearedPortNATs=%v, got %v", tc.result.ClearedPortNATs, fake.ClearedPortNATs) - } - }) - } -} - -func TestFilterForIP(t *testing.T) { - testCases := []struct { - name string - ip string - protocol v1.Protocol - expectedFamily netlink.InetFamily - expectedFilter *conntrackFilter - }{ - { - name: "ipv4 + UDP", - ip: "10.96.0.10", - protocol: v1.ProtocolUDP, - expectedFilter: &conntrackFilter{ - protocol: 17, - original: &connectionTuple{dstIP: netutils.ParseIPSloppy("10.96.0.10")}, - }, - }, - { - name: "ipv6 + TCP", - ip: "2001:db8:1::2", - protocol: v1.ProtocolTCP, - expectedFilter: &conntrackFilter{ - protocol: 6, - original: &connectionTuple{dstIP: netutils.ParseIPSloppy("2001:db8:1::2")}, - }, - }, + } } - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - require.Equal(t, tc.expectedFilter, filterForIP(tc.ip, tc.protocol)) - }) - } -} - -func TestFilterForPort(t *testing.T) { - testCases := []struct { - name string - port int - protocol v1.Protocol - expectedFilter *conntrackFilter - }{ - { - name: "UDP", - port: 5000, - protocol: v1.ProtocolUDP, - - expectedFilter: &conntrackFilter{ - protocol: 17, - original: &connectionTuple{dstPort: 5000}, + // add some non-DNATed mock entries which should be cleared up by reconciler + // These will exist if the proxy don't have DROP/REJECT rule for service with + // no endpoints, --orig-dst and --reply-src will be same for these entries. + for _, ip := range []string{testClusterIP, testLoadBalancerIP, testExternalIP} { + entry := &netlink.ConntrackFlow{ + FamilyType: unix.AF_INET, + Forward: netlink.IPTuple{ + DstIP: netutils.ParseIPSloppy(ip), + Protocol: unix.IPPROTO_UDP, }, - }, - { - name: "SCTP", - port: 3000, - protocol: v1.ProtocolSCTP, - expectedFilter: &conntrackFilter{ - protocol: 132, - original: &connectionTuple{dstPort: 3000}, + Reverse: netlink.IPTuple{ + Protocol: unix.IPPROTO_UDP, + SrcIP: netutils.ParseIPSloppy(ip), }, - }, + } + mockEntries = append(mockEntries, entry) } - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - require.Equal(t, tc.expectedFilter, filterForPort(tc.port, tc.protocol)) - }) + fake := NewFake() + fake.entries = mockEntries + CleanStaleEntries(fake, testIPFamily, svcPortMap, endpointsMap) + + actualEntries, _ := fake.ListEntries(ipFamilyMap[testIPFamily]) + require.Equal(t, len(expectedEntries), len(actualEntries)) + + // sort the actual flows before comparison + sort.Slice(actualEntries, func(i, j int) bool { + return actualEntries[i].String() < actualEntries[j].String() + }) + // sort the expected flows before comparison + sort.Slice(expectedEntries, func(i, j int) bool { + return expectedEntries[i].String() < expectedEntries[j].String() + }) + + for i := 0; i < len(expectedEntries); i++ { + require.Equal(t, expectedEntries[i], actualEntries[i]) } } diff --git a/pkg/proxy/conntrack/conntrack.go b/pkg/proxy/conntrack/conntrack.go index d58b38e5aef..1e01d654680 100644 --- a/pkg/proxy/conntrack/conntrack.go +++ b/pkg/proxy/conntrack/conntrack.go @@ -29,6 +29,7 @@ import ( // Interface for dealing with conntrack type Interface interface { + ListEntries(ipFamily uint8) ([]*netlink.ConntrackFlow, error) // ClearEntries deletes conntrack entries for connections of the given IP family, // filtered by the given filters. ClearEntries(ipFamily uint8, filters ...netlink.CustomConntrackFilter) (int, error) @@ -36,6 +37,7 @@ type Interface interface { // netlinkHandler allows consuming real and mockable implementation for testing. type netlinkHandler interface { + ConntrackTableList(netlink.ConntrackTableType, netlink.InetFamily) ([]*netlink.ConntrackFlow, error) ConntrackDeleteFilters(netlink.ConntrackTableType, netlink.InetFamily, ...netlink.CustomConntrackFilter) (uint, error) } @@ -54,6 +56,11 @@ func newConntracker(handler netlinkHandler) Interface { return &conntracker{handler: handler} } +// ListEntries list all conntrack entries for connections of the given IP family. +func (ct *conntracker) ListEntries(ipFamily uint8) ([]*netlink.ConntrackFlow, error) { + return ct.handler.ConntrackTableList(netlink.ConntrackTable, netlink.InetFamily(ipFamily)) +} + // ClearEntries deletes conntrack entries for connections of the given IP family, // filtered by the given filters. func (ct *conntracker) ClearEntries(ipFamily uint8, filters ...netlink.CustomConntrackFilter) (int, error) { @@ -64,8 +71,7 @@ func (ct *conntracker) ClearEntries(ipFamily uint8, filters ...netlink.CustomCon n, err := ct.handler.ConntrackDeleteFilters(netlink.ConntrackTable, netlink.InetFamily(ipFamily), filters...) if err != nil { - return 0, fmt.Errorf("error deleting conntrack entries, error: %w", err) + return int(n), fmt.Errorf("error deleting conntrack entries, error: %w", err) } - klog.V(4).InfoS("Cleared conntrack entries", "count", n) return int(n), nil } diff --git a/pkg/proxy/conntrack/conntrack_test.go b/pkg/proxy/conntrack/conntrack_test.go index 49eb49340f4..98f005a186c 100644 --- a/pkg/proxy/conntrack/conntrack_test.go +++ b/pkg/proxy/conntrack/conntrack_test.go @@ -35,6 +35,10 @@ type fakeHandler struct { filters []*conntrackFilter } +func (f *fakeHandler) ConntrackTableList(_ netlink.ConntrackTableType, _ netlink.InetFamily) ([]*netlink.ConntrackFlow, error) { + return nil, nil +} + func (f *fakeHandler) ConntrackDeleteFilters(tableType netlink.ConntrackTableType, family netlink.InetFamily, netlinkFilters ...netlink.CustomConntrackFilter) (uint, error) { f.tableType = tableType f.ipFamily = family @@ -48,7 +52,6 @@ func (f *fakeHandler) ConntrackDeleteFilters(tableType netlink.ConntrackTableTyp var _ netlinkHandler = (*fakeHandler)(nil) func TestConntracker_ClearEntries(t *testing.T) { - testCases := []struct { name string ipFamily uint8 diff --git a/pkg/proxy/conntrack/fake.go b/pkg/proxy/conntrack/fake.go index 201ed3324df..bedbb99b6e9 100644 --- a/pkg/proxy/conntrack/fake.go +++ b/pkg/proxy/conntrack/fake.go @@ -20,77 +20,42 @@ limitations under the License. package conntrack import ( - "fmt" - "github.com/vishvananda/netlink" - - v1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/util/sets" ) // FakeInterface implements Interface by just recording entries that have been cleared. type FakeInterface struct { - ClearedIPs sets.Set[string] - ClearedPorts sets.Set[int] - ClearedNATs map[string]string // origin -> dest - ClearedPortNATs map[int]string // port -> dest + entries []*netlink.ConntrackFlow } var _ Interface = &FakeInterface{} // NewFake creates a new FakeInterface func NewFake() *FakeInterface { - fake := &FakeInterface{} - fake.Reset() - return fake + return &FakeInterface{entries: make([]*netlink.ConntrackFlow, 0)} } -// Reset clears fake's sets/maps -func (fake *FakeInterface) Reset() { - fake.ClearedIPs = sets.New[string]() - fake.ClearedPorts = sets.New[int]() - fake.ClearedNATs = make(map[string]string) - fake.ClearedPortNATs = make(map[int]string) +// ListEntries is part of Interface +func (fake *FakeInterface) ListEntries(_ uint8) ([]*netlink.ConntrackFlow, error) { + return fake.entries, nil } // ClearEntries is part of Interface func (fake *FakeInterface) ClearEntries(_ uint8, filters ...netlink.CustomConntrackFilter) (int, error) { - for _, anyFilter := range filters { - filter := anyFilter.(*conntrackFilter) - if filter.protocol != protocolMap[v1.ProtocolUDP] { - return 0, fmt.Errorf("FakeInterface currently only supports UDP") - } - - // record IP and Port entries - if filter.original != nil && filter.reply == nil { - if filter.original.dstIP != nil { - fake.ClearedIPs.Insert(filter.original.dstIP.String()) - } - if filter.original.dstPort != 0 { - fake.ClearedPorts.Insert(int(filter.original.dstPort)) + var flows []*netlink.ConntrackFlow + before := len(fake.entries) + for _, flow := range fake.entries { + var matched bool + for _, filter := range filters { + matched = filter.MatchConntrackFlow(flow) + if matched { + break } } - - // record NAT and NATPort entries - if filter.original != nil && filter.reply != nil { - if filter.original.dstIP != nil && filter.reply.srcIP != nil { - origin := filter.original.dstIP.String() - dest := filter.reply.srcIP.String() - if previous, exists := fake.ClearedNATs[origin]; exists && previous != dest { - return 0, fmt.Errorf("filter for NAT passed with same origin (%s), different destination (%s / %s)", origin, previous, dest) - } - fake.ClearedNATs[filter.original.dstIP.String()] = filter.reply.srcIP.String() - } - - if filter.original.dstPort != 0 && filter.reply.srcIP != nil { - dest := filter.reply.srcIP.String() - port := int(filter.original.dstPort) - if previous, exists := fake.ClearedPortNATs[port]; exists && previous != dest { - return 0, fmt.Errorf("filter for PortNAT passed with same port (%d), different destination (%s / %s)", port, previous, dest) - } - fake.ClearedPortNATs[port] = dest - } + if !matched { + flows = append(flows, flow) } } - return 0, nil + fake.entries = flows + return before - len(fake.entries), nil } diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index 9ff20521f5a..eb174c86f95 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -1595,7 +1595,7 @@ func (proxier *Proxier) syncProxyRules() { } // Finish housekeeping, clear stale conntrack entries for UDP Services - conntrack.CleanStaleEntries(proxier.conntrack, proxier.ipFamily, proxier.svcPortMap, serviceUpdateResult, endpointUpdateResult) + conntrack.CleanStaleEntries(proxier.conntrack, proxier.ipFamily, proxier.svcPortMap, proxier.endpointsMap) } func (proxier *Proxier) writeServiceToEndpointRules(natRules proxyutil.LineBuffer, svcPortNameString string, svcInfo proxy.ServicePort, svcChain utiliptables.Chain, endpoints []proxy.Endpoint, args []string) { diff --git a/pkg/proxy/ipvs/proxier.go b/pkg/proxy/ipvs/proxier.go index 2378c075ee0..3b1f6fe96b9 100644 --- a/pkg/proxy/ipvs/proxier.go +++ b/pkg/proxy/ipvs/proxier.go @@ -932,7 +932,7 @@ func (proxier *Proxier) syncProxyRules() { // We assume that if this was called, we really want to sync them, // even if nothing changed in the meantime. In other words, callers are // responsible for detecting no-op changes and not calling this function. - serviceUpdateResult := proxier.svcPortMap.Update(proxier.serviceChanges) + _ = proxier.svcPortMap.Update(proxier.serviceChanges) endpointUpdateResult := proxier.endpointsMap.Update(proxier.endpointsChanges) proxier.logger.V(3).Info("Syncing ipvs proxier rules") @@ -1498,7 +1498,7 @@ func (proxier *Proxier) syncProxyRules() { metrics.SyncProxyRulesNoLocalEndpointsTotal.WithLabelValues("external").Set(float64(proxier.serviceNoLocalEndpointsExternal.Len())) // Finish housekeeping, clear stale conntrack entries for UDP Services - conntrack.CleanStaleEntries(proxier.conntrack, proxier.ipFamily, proxier.svcPortMap, serviceUpdateResult, endpointUpdateResult) + conntrack.CleanStaleEntries(proxier.conntrack, proxier.ipFamily, proxier.svcPortMap, proxier.endpointsMap) } // writeIptablesRules write all iptables rules to proxier.natRules or proxier.FilterRules that ipvs proxier needed diff --git a/pkg/proxy/nftables/proxier.go b/pkg/proxy/nftables/proxier.go index 7c419b9c54e..257517e5478 100644 --- a/pkg/proxy/nftables/proxier.go +++ b/pkg/proxy/nftables/proxier.go @@ -1839,7 +1839,7 @@ func (proxier *Proxier) syncProxyRules() { } // Finish housekeeping, clear stale conntrack entries for UDP Services - conntrack.CleanStaleEntries(proxier.conntrack, proxier.ipFamily, proxier.svcPortMap, serviceUpdateResult, endpointUpdateResult) + conntrack.CleanStaleEntries(proxier.conntrack, proxier.ipFamily, proxier.svcPortMap, proxier.endpointsMap) } func (proxier *Proxier) writeServiceToEndpointRules(tx *knftables.Transaction, svcInfo *servicePortInfo, svcChain string, endpoints []proxy.Endpoint) {