Merge pull request #130542 from aroradaman/conntrack-service-target-port

Conntrack reconciler now considers service's target port during cleanup of stale flow entries
This commit is contained in:
Kubernetes Prow Robot
2025-06-23 07:47:55 -07:00
committed by GitHub
2 changed files with 129 additions and 135 deletions

View File

@@ -33,7 +33,6 @@ import (
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/proxy"
"k8s.io/kubernetes/pkg/proxy/metrics"
netutils "k8s.io/utils/net"
)
// Kubernetes UDP services can be affected by stale conntrack entries.
@@ -60,11 +59,11 @@ func CleanStaleEntries(ct Interface, ipFamily v1.IPFamily,
}
}
// serviceIPEndpointIPs maps service IPs (ClusterIP, LoadBalancerIPs and ExternalIPs) and Service Port
// to the set of serving endpoint IPs.
serviceIPEndpointIPs := make(map[string]sets.Set[string])
// serviceNodePortEndpointIPs maps service NodePort to the set of serving endpoint IPs.
serviceNodePortEndpointIPs := make(map[int]sets.Set[string])
// serviceIPEndpoints maps service IPs (ClusterIP, LoadBalancerIPs and ExternalIPs) and Service Port
// to the set of serving endpoints (Endpoint IP and Port).
serviceIPEndpoints := make(map[string]sets.Set[string])
// serviceNodePortEndpoints maps service NodePort to the set of serving endpoints (Endpoint IP and Port).
serviceNodePortEndpoints := make(map[int]sets.Set[string])
for svcName, svc := range svcPortMap {
// we are only interested in UDP services
@@ -72,44 +71,45 @@ func CleanStaleEntries(ct Interface, ipFamily v1.IPFamily,
continue
}
endpointIPs := sets.New[string]()
endpoints := sets.New[string]()
for _, endpoint := range endpointsMap[svcName] {
// We need to remove all the conntrack entries for a Service (IP or NodePort)
// that are not pointing to a serving endpoint.
// We map all the serving endpoint IPs to the service and clear all the conntrack
// We map all the serving endpoints of the service and clear all the conntrack
// entries which are destined for the service and are not DNATed to these endpoints.
// Changes to the service should not affect existing flows, so we do not take
// traffic policies, topology, or terminating status of the service into account.
// This ensures that the behavior of UDP services remains consistent with TCP
// services.
if endpoint.IsServing() {
endpointIPs.Insert(endpoint.IP())
portStr := strconv.Itoa(int(endpoint.Port()))
endpoints.Insert(net.JoinHostPort(endpoint.IP(), portStr))
}
}
// a Service without endpoints does not require to clean the conntrack entries associated.
if endpointIPs.Len() == 0 {
if endpoints.Len() == 0 {
continue
}
// we need to filter entries that are directed to a Service IP:Port frontend
// that does not have a backend as part of the endpoints IPs
// that does not have an Endpoint IP:Port backend as part of the serving endpoints
portStr := strconv.Itoa(svc.Port())
// clusterIP:Port
serviceIPEndpointIPs[net.JoinHostPort(svc.ClusterIP().String(), portStr)] = endpointIPs
serviceIPEndpoints[net.JoinHostPort(svc.ClusterIP().String(), portStr)] = endpoints
// loadbalancerIP:Port
for _, loadBalancerIP := range svc.LoadBalancerVIPs() {
serviceIPEndpointIPs[net.JoinHostPort(loadBalancerIP.String(), portStr)] = endpointIPs
serviceIPEndpoints[net.JoinHostPort(loadBalancerIP.String(), portStr)] = endpoints
}
// externalIP:Port
for _, externalIP := range svc.ExternalIPs() {
serviceIPEndpointIPs[net.JoinHostPort(externalIP.String(), portStr)] = endpointIPs
serviceIPEndpoints[net.JoinHostPort(externalIP.String(), portStr)] = endpoints
}
// we need to filter entries that are directed to a *:NodePort
// that does not have a backend as part of the endpoints IPs
// we need to filter entries that are directed to a *:NodePort that does not have
// an Endpoint IP:Port backend as part of the serving endpoints
if svc.NodePort() != 0 {
// *:NodePort
serviceNodePortEndpointIPs[svc.NodePort()] = endpointIPs
serviceNodePortEndpoints[svc.NodePort()] = endpoints
}
}
@@ -123,22 +123,25 @@ func CleanStaleEntries(ct Interface, ipFamily v1.IPFamily,
origDst := entry.Forward.DstIP.String() // match Service IP
origPortDst := int(entry.Forward.DstPort) // match Service Port
origPortDstStr := strconv.Itoa(origPortDst)
replySrc := entry.Reverse.SrcIP.String() // match Serving Endpoint IP
replySrc := entry.Reverse.SrcIP.String() // match Serving Endpoint IP
replyPortSrc := int(entry.Reverse.SrcPort) // match Serving Endpoint Port
replyPortSrcStr := strconv.Itoa(replyPortSrc)
// if the original destination (--orig-dst) of the entry is service IP (ClusterIP,
// LoadBalancerIPs or ExternalIPs) and (--orig-port-dst) of the flow is service Port
// and the reply source (--reply-src) is not IP of any serving endpoint, we clear the entry.
endpoints, ok := serviceIPEndpointIPs[net.JoinHostPort(origDst, origPortDstStr)]
if ok && !endpoints.Has(replySrc) {
filters = append(filters, filterForIPPortNAT(origDst, replySrc, entry.Forward.DstPort, v1.ProtocolUDP))
// LoadBalancerIPs or ExternalIPs) and (--orig-port-dst) is service Port and
// the reply source IP (--reply-src) and port (--reply-port-src) does not
// represent a serving endpoint of the service, we clear the entry.
endpoints, ok := serviceIPEndpoints[net.JoinHostPort(origDst, origPortDstStr)]
if ok && !endpoints.Has(net.JoinHostPort(replySrc, replyPortSrcStr)) {
filters = append(filters, filterForIPPortNAT(entry.Forward.DstIP, entry.Forward.DstPort, entry.Reverse.SrcIP, entry.Reverse.SrcPort, v1.ProtocolUDP))
}
// if the original port destination (--orig-port-dst) of the flow is service
// NodePort and the reply source (--reply-src) is not IP of any serving endpoint,
// we clear the entry.
endpoints, ok = serviceNodePortEndpointIPs[origPortDst]
if ok && !endpoints.Has(replySrc) {
filters = append(filters, filterForPortNAT(replySrc, origPortDst, v1.ProtocolUDP))
// if the original destination port (--orig-port-dst) of the entry is service
// NodePort and the reply source IP (--reply-src) and port (--reply-port-src)
// does not represent a serving endpoint of the service, we clear the entry.
endpoints, ok = serviceNodePortEndpoints[origPortDst]
if ok && !endpoints.Has(net.JoinHostPort(replySrc, replyPortSrcStr)) {
filters = append(filters, filterForPortNAT(entry.Forward.DstPort, entry.Reverse.SrcIP, entry.Reverse.SrcPort, v1.ProtocolUDP))
}
}
@@ -168,32 +171,34 @@ var protocolMap = map[v1.Protocol]uint8{
// filterForIPPortNAT returns *conntrackFilter to delete the conntrack entries for connections
// specified by the destination IP (original direction) and destination port (original direction)
// and source IP (reply direction).
func filterForIPPortNAT(origin, dest string, dstPort uint16, protocol v1.Protocol) *conntrackFilter {
klog.V(6).InfoS("Adding conntrack filter for cleanup", "org-dst", origin, "reply-src", dest, "protocol", protocol)
// and source IP (reply direction) and source port (reply direction).
func filterForIPPortNAT(origDst net.IP, origPortDst uint16, replySrc net.IP, replyPortSrc uint16, protocol v1.Protocol) *conntrackFilter {
klog.V(6).InfoS("Adding conntrack filter for cleanup", "orig-dst", origDst.String(), "orig-port-dst", origPortDst, "reply-src", replySrc.String(), "reply-port-src", replyPortSrc, "protocol", protocol)
return &conntrackFilter{
protocol: protocolMap[protocol],
original: &connectionTuple{
dstIP: netutils.ParseIPSloppy(origin),
dstPort: dstPort,
dstIP: origDst,
dstPort: origPortDst,
},
reply: &connectionTuple{
srcIP: netutils.ParseIPSloppy(dest),
srcIP: replySrc,
srcPort: replyPortSrc,
},
}
}
// filterForPortNAT returns *conntrackFilter to delete the conntrack entries for connections
// specified by the destination Port (original direction) and source IP (reply direction).
func filterForPortNAT(dest string, port int, protocol v1.Protocol) *conntrackFilter {
klog.V(6).InfoS("Adding conntrack filter for cleanup", "org-port-dst", port, "reply-src", dest, "protocol", protocol)
// filterForPortNAT returns *conntrackFilter to delete the conntrack entries for connections specified by the
// destination Port (original direction) and source IP (reply direction) and source port (reply direction).
func filterForPortNAT(origPortDst uint16, replySrc net.IP, replyPortSrc uint16, protocol v1.Protocol) *conntrackFilter {
klog.V(6).InfoS("Adding conntrack filter for cleanup", "orig-port-dst", origPortDst, "reply-src", replySrc.String(), "reply-port-src", replyPortSrc, "protocol", protocol)
return &conntrackFilter{
protocol: protocolMap[protocol],
original: &connectionTuple{
dstPort: uint16(port),
dstPort: origPortDst,
},
reply: &connectionTuple{
srcIP: netutils.ParseIPSloppy(dest),
srcIP: replySrc,
srcPort: replyPortSrc,
},
}
}

View File

@@ -22,6 +22,7 @@ package conntrack
import (
"fmt"
"math/rand"
"net"
"sort"
"testing"
@@ -55,6 +56,10 @@ const (
testNonServingEndpointIP = "10.240.1.5"
testDeletedEndpointIP = "10.240.2.6"
// testOldEndpointPort is used to cover cases when endpoint changes port,
// but IP remains same.
testOldEndpointPort = 8080
testEndpointPort = 9090
testServicePort = 8000
testServiceNodePort = 32000
// testNonServicePort is used to mock conntrack flow entries which are not owned by
@@ -62,6 +67,27 @@ const (
testNonServicePort = 3000
)
// generateConntrackEntry generates *netlink.ConntrackFlow for unit-testing.
func generateConntrackEntry(origDst string, origPortDst uint16, replySrc string, replyPortSrc uint16, proto uint8) *netlink.ConntrackFlow {
entry := &netlink.ConntrackFlow{
FamilyType: unix.AF_INET,
Forward: netlink.IPTuple{
DstPort: origPortDst,
Protocol: proto,
},
Reverse: netlink.IPTuple{
Protocol: proto,
SrcIP: netutils.ParseIPSloppy(replySrc),
SrcPort: replyPortSrc,
},
}
// we don't match on --orig-dst for node port services (*:NodePort), --orig-dst is thus handled separately
if origDst != "" {
entry.Forward.DstIP = netutils.ParseIPSloppy(origDst)
}
return entry
}
func TestCleanStaleEntries(t *testing.T) {
// We need to construct proxy.ServicePortMap and proxy.EndpointsMap to pass to
// CleanStaleEntries. ServicePortMap and EndpointsMap are just maps, but there are
@@ -134,17 +160,17 @@ func TestCleanStaleEntries(t *testing.T) {
Ports: []discovery.EndpointPort{
{
Name: ptr.To("test-tcp"),
Port: ptr.To(int32(testServicePort)),
Port: ptr.To(int32(testEndpointPort)),
Protocol: ptr.To(v1.ProtocolTCP),
},
{
Name: ptr.To("test-udp"),
Port: ptr.To(int32(testServicePort)),
Port: ptr.To(int32(testEndpointPort)),
Protocol: ptr.To(v1.ProtocolUDP),
},
{
Name: ptr.To("test-sctp"),
Port: ptr.To(int32(testServicePort)),
Port: ptr.To(int32(testEndpointPort)),
Protocol: ptr.To(v1.ProtocolSCTP),
},
},
@@ -229,24 +255,19 @@ func TestCleanStaleEntries(t *testing.T) {
// entriesBeforeCleanup - entriesAfterCleanup = entries cleared by conntrack reconciler
var entriesAfterCleanup []*netlink.ConntrackFlow
// we create 6 fake flow entries with `testOldEndpointPort`, this simulates the case when
// endpoints change port without changing IP. These entries should be cleared by reconciler.
for _, origDest := range []string{testClusterIP, testLoadBalancerIP, testExternalIP} {
entry := generateConntrackEntry(origDest, testServicePort, testServingEndpointIP, testOldEndpointPort, unix.IPPROTO_UDP)
entriesBeforeCleanup = append(entriesBeforeCleanup, entry)
}
// we create 63 fake flow entries ( 3 Endpoints * 3 Protocols * ( 3 (ServiceIP:ServicePort) + 3 (ServiceIP:NonServicePort) + 1 (NodePort))
for _, dnatDest := range []string{testServingEndpointIP, testNonServingEndpointIP, testDeletedEndpointIP} {
for _, proto := range []uint8{unix.IPPROTO_TCP, unix.IPPROTO_UDP, unix.IPPROTO_SCTP} {
for _, origDest := range []string{testClusterIP, testLoadBalancerIP, testExternalIP} {
for _, port := range []uint16{testServicePort, testNonServicePort} {
entry := &netlink.ConntrackFlow{
FamilyType: unix.AF_INET,
Forward: netlink.IPTuple{
DstIP: netutils.ParseIPSloppy(origDest),
DstPort: port,
Protocol: proto,
},
Reverse: netlink.IPTuple{
Protocol: proto,
SrcIP: netutils.ParseIPSloppy(dnatDest),
},
}
entry := generateConntrackEntry(origDest, port, dnatDest, testEndpointPort, proto)
entriesBeforeCleanup = append(entriesBeforeCleanup, entry)
if proto == unix.IPPROTO_UDP && port == testServicePort && dnatDest != testServingEndpointIP {
// we do not expect UDP entries with destination port `testServicePort` and DNATed destination
@@ -257,17 +278,7 @@ func TestCleanStaleEntries(t *testing.T) {
}
}
entry := &netlink.ConntrackFlow{
FamilyType: unix.AF_INET,
Forward: netlink.IPTuple{
DstPort: testServiceNodePort,
Protocol: proto,
},
Reverse: netlink.IPTuple{
Protocol: proto,
SrcIP: netutils.ParseIPSloppy(dnatDest),
},
}
entry := generateConntrackEntry("", testServiceNodePort, dnatDest, testEndpointPort, proto)
entriesBeforeCleanup = append(entriesBeforeCleanup, entry)
if proto == unix.IPPROTO_UDP && dnatDest != testServingEndpointIP {
// we do not expect UDP entries with DNATed destination address not
@@ -387,7 +398,7 @@ func TestPerformanceCleanStaleEntries(t *testing.T) {
Ports: []discovery.EndpointPort{
{
Name: ptr.To("test-udp"),
Port: ptr.To(int32(testServicePort)),
Port: ptr.To(int32(testEndpointPort)),
Protocol: ptr.To(v1.ProtocolUDP),
},
},
@@ -397,27 +408,12 @@ func TestPerformanceCleanStaleEntries(t *testing.T) {
endpointsMap := make(proxy.EndpointsMap)
_ = endpointsMap.Update(ect)
genConntrackEntry := func(dstPort uint16, revSrc string) *netlink.ConntrackFlow {
return &netlink.ConntrackFlow{
FamilyType: unix.AF_INET,
Forward: netlink.IPTuple{
DstIP: netutils.ParseIPSloppy(testExternalIP),
DstPort: dstPort,
Protocol: unix.IPPROTO_UDP,
},
Reverse: netlink.IPTuple{
Protocol: unix.IPPROTO_UDP,
SrcIP: netutils.ParseIPSloppy(revSrc),
},
}
}
fake := NewFake()
// 1 valid entry
fake.entries = append(fake.entries, genConntrackEntry(uint16(testServicePort), testServingEndpointIP))
fake.entries = append(fake.entries, generateConntrackEntry(testExternalIP, testServicePort, testServingEndpointIP, testEndpointPort, unix.IPPROTO_UDP))
expectedEntries := 1
// 1 stale entry
fake.entries = append(fake.entries, genConntrackEntry(uint16(testServicePort), testDeletedEndpointIP))
fake.entries = append(fake.entries, generateConntrackEntry(testExternalIP, testServicePort, testDeletedEndpointIP, testEndpointPort, unix.IPPROTO_UDP))
expectedDeleted := 1
// 1 M to the Service IP with random ports
for i := 0; i < 1000*1000; i++ {
@@ -427,7 +423,7 @@ func TestPerformanceCleanStaleEntries(t *testing.T) {
} else {
expectedEntries++
}
fake.entries = append(fake.entries, genConntrackEntry(port, testDeletedEndpointIP))
fake.entries = append(fake.entries, generateConntrackEntry(testExternalIP, port, testDeletedEndpointIP, testEndpointPort, unix.IPPROTO_UDP))
}
CleanStaleEntries(fake, testIPFamily, svcPortMap, endpointsMap)
@@ -497,7 +493,7 @@ func TestServiceWithoutEndpoints(t *testing.T) {
Ports: []discovery.EndpointPort{
{
Name: ptr.To("test-udp"),
Port: ptr.To(int32(testServicePort)),
Port: ptr.To(int32(testEndpointPort)),
Protocol: ptr.To(v1.ProtocolUDP),
},
},
@@ -507,26 +503,11 @@ func TestServiceWithoutEndpoints(t *testing.T) {
endpointsMap := make(proxy.EndpointsMap)
_ = endpointsMap.Update(ect)
genConntrackEntry := func(dstPort uint16, revSrc string) *netlink.ConntrackFlow {
return &netlink.ConntrackFlow{
FamilyType: unix.AF_INET,
Forward: netlink.IPTuple{
DstIP: netutils.ParseIPSloppy(testExternalIP),
DstPort: dstPort,
Protocol: unix.IPPROTO_UDP,
},
Reverse: netlink.IPTuple{
Protocol: unix.IPPROTO_UDP,
SrcIP: netutils.ParseIPSloppy(revSrc),
},
}
}
fake := NewFake()
// 1 valid entry
fake.entries = append(fake.entries, genConntrackEntry(uint16(testServicePort), testServingEndpointIP))
fake.entries = append(fake.entries, generateConntrackEntry(testExternalIP, testServicePort, testServingEndpointIP, testEndpointPort, unix.IPPROTO_UDP))
// 1 stale entry
fake.entries = append(fake.entries, genConntrackEntry(uint16(testServicePort), testDeletedEndpointIP))
fake.entries = append(fake.entries, generateConntrackEntry(testExternalIP, testServicePort, testDeletedEndpointIP, testEndpointPort, unix.IPPROTO_UDP))
CleanStaleEntries(fake, testIPFamily, svcPortMap, endpointsMap)
actualEntries, _ := fake.ListEntries(ipFamilyMap[testIPFamily])
@@ -535,42 +516,47 @@ func TestServiceWithoutEndpoints(t *testing.T) {
}
}
func TestFilterForNAT(t *testing.T) {
func TestFilterForIPPortNAT(t *testing.T) {
testCases := []struct {
name string
orig string
dest string
dstPort uint16
origDst net.IP
origPortDst uint16
replySrc net.IP
replySrcPort uint16
protocol v1.Protocol
expectedFilter *conntrackFilter
}{
{
name: "ipv4 + SCTP",
orig: "10.96.0.10",
dest: "10.244.0.3",
protocol: v1.ProtocolSCTP,
name: "ipv4 + SCTP",
origDst: netutils.ParseIPSloppy("10.96.0.10"),
origPortDst: 80,
replySrc: netutils.ParseIPSloppy("10.244.0.3"),
replySrcPort: 3000,
protocol: v1.ProtocolSCTP,
expectedFilter: &conntrackFilter{
protocol: 132,
original: &connectionTuple{dstIP: netutils.ParseIPSloppy("10.96.0.10")},
reply: &connectionTuple{srcIP: netutils.ParseIPSloppy("10.244.0.3")},
original: &connectionTuple{dstIP: netutils.ParseIPSloppy("10.96.0.10"), dstPort: 80},
reply: &connectionTuple{srcIP: netutils.ParseIPSloppy("10.244.0.3"), srcPort: 3000},
},
},
{
name: "ipv6 + UDP",
orig: "2001:db8:1::2",
dest: "4001:ab8::2",
protocol: v1.ProtocolUDP,
name: "ipv6 + UDP",
origDst: netutils.ParseIPSloppy("2001:db8:1::2"),
origPortDst: 443,
replySrc: netutils.ParseIPSloppy("4001:ab8::2"),
replySrcPort: 5000,
protocol: v1.ProtocolUDP,
expectedFilter: &conntrackFilter{
protocol: 17,
original: &connectionTuple{dstIP: netutils.ParseIPSloppy("2001:db8:1::2")},
reply: &connectionTuple{srcIP: netutils.ParseIPSloppy("4001:ab8::2")},
original: &connectionTuple{dstIP: netutils.ParseIPSloppy("2001:db8:1::2"), dstPort: 443},
reply: &connectionTuple{srcIP: netutils.ParseIPSloppy("4001:ab8::2"), srcPort: 5000},
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
require.Equal(t, tc.expectedFilter, filterForIPPortNAT(tc.orig, tc.dest, tc.dstPort, tc.protocol))
require.Equal(t, tc.expectedFilter, filterForIPPortNAT(tc.origDst, tc.origPortDst, tc.replySrc, tc.replySrcPort, tc.protocol))
})
}
}
@@ -578,39 +564,42 @@ func TestFilterForNAT(t *testing.T) {
func TestFilterForPortNAT(t *testing.T) {
testCases := []struct {
name string
dest string
port int
origPortDst uint16
replySrc net.IP
replySrcPort uint16
protocol v1.Protocol
expectedFamily netlink.InetFamily
expectedFilter *conntrackFilter
}{
{
name: "ipv4 + TCP",
dest: "10.96.0.10",
port: 80,
protocol: v1.ProtocolTCP,
name: "ipv4 + TCP",
origPortDst: 80,
replySrc: netutils.ParseIPSloppy("10.96.0.10"),
replySrcPort: 3000,
protocol: v1.ProtocolTCP,
expectedFilter: &conntrackFilter{
protocol: 6,
original: &connectionTuple{dstPort: 80},
reply: &connectionTuple{srcIP: netutils.ParseIPSloppy("10.96.0.10")},
reply: &connectionTuple{srcIP: netutils.ParseIPSloppy("10.96.0.10"), srcPort: 3000},
},
},
{
name: "ipv6 + UDP",
dest: "2001:db8:1::2",
port: 8000,
protocol: v1.ProtocolUDP,
name: "ipv6 + UDP",
origPortDst: 8000,
replySrc: netutils.ParseIPSloppy("2001:db8:1::2"),
replySrcPort: 5000,
protocol: v1.ProtocolUDP,
expectedFilter: &conntrackFilter{
protocol: 17,
original: &connectionTuple{dstPort: 8000},
reply: &connectionTuple{srcIP: netutils.ParseIPSloppy("2001:db8:1::2")},
reply: &connectionTuple{srcIP: netutils.ParseIPSloppy("2001:db8:1::2"), srcPort: 5000},
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
require.Equal(t, tc.expectedFilter, filterForPortNAT(tc.dest, tc.port, tc.protocol))
require.Equal(t, tc.expectedFilter, filterForPortNAT(tc.origPortDst, tc.replySrc, tc.replySrcPort, tc.protocol))
})
}
}