mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-25 20:53:33 +00:00
Merge pull request #130505 from aojea/automated-cherry-pick-of-#130484-upstream-release-1.32
Automated cherry pick of #130484: conntrack reconciler must check the dst port
This commit is contained in:
commit
b0554e32c7
@ -20,6 +20,8 @@ limitations under the License.
|
|||||||
package conntrack
|
package conntrack
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"net"
|
||||||
|
"strconv"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/vishvananda/netlink"
|
"github.com/vishvananda/netlink"
|
||||||
@ -32,8 +34,14 @@ import (
|
|||||||
netutils "k8s.io/utils/net"
|
netutils "k8s.io/utils/net"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// Kubernetes UDP services can be affected by stale conntrack entries.
|
||||||
|
// These entries may point to endpoints that no longer exist,
|
||||||
|
// leading to packet loss and connectivity problems.
|
||||||
|
|
||||||
// CleanStaleEntries scans conntrack table and removes any entries
|
// CleanStaleEntries scans conntrack table and removes any entries
|
||||||
// for a service that do not correspond to a serving endpoint.
|
// for a service that do not correspond to a serving endpoint.
|
||||||
|
// List existing conntrack entries and calculate the desired conntrack state
|
||||||
|
// based on the current Services and Endpoints.
|
||||||
func CleanStaleEntries(ct Interface, ipFamily v1.IPFamily,
|
func CleanStaleEntries(ct Interface, ipFamily v1.IPFamily,
|
||||||
svcPortMap proxy.ServicePortMap, endpointsMap proxy.EndpointsMap) {
|
svcPortMap proxy.ServicePortMap, endpointsMap proxy.EndpointsMap) {
|
||||||
|
|
||||||
@ -46,7 +54,7 @@ func CleanStaleEntries(ct Interface, ipFamily v1.IPFamily,
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// serviceIPEndpointIPs maps service IPs (ClusterIP, LoadBalancerIPs and ExternalIPs)
|
// serviceIPEndpointIPs maps service IPs (ClusterIP, LoadBalancerIPs and ExternalIPs) and Service Port
|
||||||
// to the set of serving endpoint IPs.
|
// to the set of serving endpoint IPs.
|
||||||
serviceIPEndpointIPs := make(map[string]sets.Set[string])
|
serviceIPEndpointIPs := make(map[string]sets.Set[string])
|
||||||
// serviceNodePortEndpointIPs maps service NodePort to the set of serving endpoint IPs.
|
// serviceNodePortEndpointIPs maps service NodePort to the set of serving endpoint IPs.
|
||||||
@ -73,14 +81,28 @@ func CleanStaleEntries(ct Interface, ipFamily v1.IPFamily,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
serviceIPEndpointIPs[svc.ClusterIP().String()] = endpointIPs
|
// a Service without endpoints does not require to clean the conntrack entries associated.
|
||||||
|
if endpointIPs.Len() == 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// we need to filter entries that are directed to a Service IP:Port frontend
|
||||||
|
// that does not have a backend as part of the endpoints IPs
|
||||||
|
portStr := strconv.Itoa(svc.Port())
|
||||||
|
// clusterIP:Port
|
||||||
|
serviceIPEndpointIPs[net.JoinHostPort(svc.ClusterIP().String(), portStr)] = endpointIPs
|
||||||
|
// loadbalancerIP:Port
|
||||||
for _, loadBalancerIP := range svc.LoadBalancerVIPs() {
|
for _, loadBalancerIP := range svc.LoadBalancerVIPs() {
|
||||||
serviceIPEndpointIPs[loadBalancerIP.String()] = endpointIPs
|
serviceIPEndpointIPs[net.JoinHostPort(loadBalancerIP.String(), portStr)] = endpointIPs
|
||||||
}
|
}
|
||||||
|
// externalIP:Port
|
||||||
for _, externalIP := range svc.ExternalIPs() {
|
for _, externalIP := range svc.ExternalIPs() {
|
||||||
serviceIPEndpointIPs[externalIP.String()] = endpointIPs
|
serviceIPEndpointIPs[net.JoinHostPort(externalIP.String(), portStr)] = endpointIPs
|
||||||
}
|
}
|
||||||
|
// we need to filter entries that are directed to a *:NodePort
|
||||||
|
// that does not have a backend as part of the endpoints IPs
|
||||||
if svc.NodePort() != 0 {
|
if svc.NodePort() != 0 {
|
||||||
|
// *:NodePort
|
||||||
serviceNodePortEndpointIPs[svc.NodePort()] = endpointIPs
|
serviceNodePortEndpointIPs[svc.NodePort()] = endpointIPs
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -92,26 +114,25 @@ func CleanStaleEntries(ct Interface, ipFamily v1.IPFamily,
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
origDst := entry.Forward.DstIP.String()
|
origDst := entry.Forward.DstIP.String() // match Service IP
|
||||||
origPortDst := int(entry.Forward.DstPort)
|
origPortDst := int(entry.Forward.DstPort) // match Service Port
|
||||||
replySrc := entry.Reverse.SrcIP.String()
|
origPortDstStr := strconv.Itoa(origPortDst)
|
||||||
|
replySrc := entry.Reverse.SrcIP.String() // match Serving Endpoint IP
|
||||||
|
|
||||||
// if the original destination (--orig-dst) of the entry is service IP (ClusterIP,
|
// if the original destination (--orig-dst) of the entry is service IP (ClusterIP,
|
||||||
// LoadBalancerIPs or ExternalIPs) and the reply source (--reply-src) is not IP of
|
// LoadBalancerIPs or ExternalIPs) and (--orig-port-dst) of the flow is service Port
|
||||||
// any serving endpoint, we clear the entry.
|
// and the reply source (--reply-src) is not IP of any serving endpoint, we clear the entry.
|
||||||
if _, ok := serviceIPEndpointIPs[origDst]; ok {
|
endpoints, ok := serviceIPEndpointIPs[net.JoinHostPort(origDst, origPortDstStr)]
|
||||||
if !serviceIPEndpointIPs[origDst].Has(replySrc) {
|
if ok && !endpoints.Has(replySrc) {
|
||||||
filters = append(filters, filterForNAT(origDst, replySrc, v1.ProtocolUDP))
|
filters = append(filters, filterForIPPortNAT(origDst, replySrc, entry.Forward.DstPort, v1.ProtocolUDP))
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// if the original port destination (--orig-port-dst) of the flow is service
|
// if the original port destination (--orig-port-dst) of the flow is service
|
||||||
// NodePort and the reply source (--reply-src) is not IP of any serving endpoint,
|
// NodePort and the reply source (--reply-src) is not IP of any serving endpoint,
|
||||||
// we clear the entry.
|
// we clear the entry.
|
||||||
if _, ok := serviceNodePortEndpointIPs[origPortDst]; ok {
|
endpoints, ok = serviceNodePortEndpointIPs[origPortDst]
|
||||||
if !serviceNodePortEndpointIPs[origPortDst].Has(replySrc) {
|
if ok && !endpoints.Has(replySrc) {
|
||||||
filters = append(filters, filterForPortNAT(replySrc, origPortDst, v1.ProtocolUDP))
|
filters = append(filters, filterForPortNAT(replySrc, origPortDst, v1.ProtocolUDP))
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -136,14 +157,16 @@ var protocolMap = map[v1.Protocol]uint8{
|
|||||||
v1.ProtocolSCTP: unix.IPPROTO_SCTP,
|
v1.ProtocolSCTP: unix.IPPROTO_SCTP,
|
||||||
}
|
}
|
||||||
|
|
||||||
// filterForNAT returns *conntrackFilter to delete the conntrack entries for connections
|
// filterForIPPortNAT returns *conntrackFilter to delete the conntrack entries for connections
|
||||||
// specified by the destination IP (original direction) and source IP (reply direction).
|
// specified by the destination IP (original direction) and destination port (original direction)
|
||||||
func filterForNAT(origin, dest string, protocol v1.Protocol) *conntrackFilter {
|
// and source IP (reply direction).
|
||||||
klog.V(4).InfoS("Adding conntrack filter for cleanup", "org-dst", origin, "reply-src", dest, "protocol", protocol)
|
func filterForIPPortNAT(origin, dest string, dstPort uint16, protocol v1.Protocol) *conntrackFilter {
|
||||||
|
klog.V(6).InfoS("Adding conntrack filter for cleanup", "org-dst", origin, "reply-src", dest, "protocol", protocol)
|
||||||
return &conntrackFilter{
|
return &conntrackFilter{
|
||||||
protocol: protocolMap[protocol],
|
protocol: protocolMap[protocol],
|
||||||
original: &connectionTuple{
|
original: &connectionTuple{
|
||||||
dstIP: netutils.ParseIPSloppy(origin),
|
dstIP: netutils.ParseIPSloppy(origin),
|
||||||
|
dstPort: dstPort,
|
||||||
},
|
},
|
||||||
reply: &connectionTuple{
|
reply: &connectionTuple{
|
||||||
srcIP: netutils.ParseIPSloppy(dest),
|
srcIP: netutils.ParseIPSloppy(dest),
|
||||||
|
@ -21,9 +21,11 @@ package conntrack
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"math/rand"
|
||||||
"sort"
|
"sort"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
"github.com/google/go-cmp/cmp"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
"github.com/vishvananda/netlink"
|
"github.com/vishvananda/netlink"
|
||||||
"golang.org/x/sys/unix"
|
"golang.org/x/sys/unix"
|
||||||
@ -50,8 +52,11 @@ const (
|
|||||||
testNonServingEndpointIP = "10.240.1.5"
|
testNonServingEndpointIP = "10.240.1.5"
|
||||||
testDeletedEndpointIP = "10.240.2.6"
|
testDeletedEndpointIP = "10.240.2.6"
|
||||||
|
|
||||||
testPort = 8000
|
testServicePort = 8000
|
||||||
testNodePort = 32000
|
testServiceNodePort = 32000
|
||||||
|
// testNonServicePort is used to mock conntrack flow entries which are not owned by
|
||||||
|
// kube-proxy and reconciler should not consider these for cleanup
|
||||||
|
testNonServicePort = 3000
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestCleanStaleEntries(t *testing.T) {
|
func TestCleanStaleEntries(t *testing.T) {
|
||||||
@ -74,19 +79,19 @@ func TestCleanStaleEntries(t *testing.T) {
|
|||||||
Ports: []v1.ServicePort{
|
Ports: []v1.ServicePort{
|
||||||
{
|
{
|
||||||
Name: "test-tcp",
|
Name: "test-tcp",
|
||||||
Port: testPort,
|
Port: testServicePort,
|
||||||
Protocol: v1.ProtocolTCP,
|
Protocol: v1.ProtocolTCP,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
Name: "test-udp",
|
Name: "test-udp",
|
||||||
Port: testPort,
|
Port: testServicePort,
|
||||||
NodePort: testNodePort,
|
NodePort: testServiceNodePort,
|
||||||
Protocol: v1.ProtocolUDP,
|
Protocol: v1.ProtocolUDP,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
Name: "test-sctp",
|
Name: "test-sctp",
|
||||||
Port: testPort,
|
Port: testServicePort,
|
||||||
NodePort: testNodePort,
|
NodePort: testServiceNodePort,
|
||||||
Protocol: v1.ProtocolSCTP,
|
Protocol: v1.ProtocolSCTP,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@ -126,17 +131,17 @@ func TestCleanStaleEntries(t *testing.T) {
|
|||||||
Ports: []discovery.EndpointPort{
|
Ports: []discovery.EndpointPort{
|
||||||
{
|
{
|
||||||
Name: ptr.To("test-tcp"),
|
Name: ptr.To("test-tcp"),
|
||||||
Port: ptr.To(int32(testPort)),
|
Port: ptr.To(int32(testServicePort)),
|
||||||
Protocol: ptr.To(v1.ProtocolTCP),
|
Protocol: ptr.To(v1.ProtocolTCP),
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
Name: ptr.To("test-udp"),
|
Name: ptr.To("test-udp"),
|
||||||
Port: ptr.To(int32(testPort)),
|
Port: ptr.To(int32(testServicePort)),
|
||||||
Protocol: ptr.To(v1.ProtocolUDP),
|
Protocol: ptr.To(v1.ProtocolUDP),
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
Name: ptr.To("test-sctp"),
|
Name: ptr.To("test-sctp"),
|
||||||
Port: ptr.To(int32(testPort)),
|
Port: ptr.To(int32(testServicePort)),
|
||||||
Protocol: ptr.To(v1.ProtocolSCTP),
|
Protocol: ptr.To(v1.ProtocolSCTP),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@ -212,35 +217,47 @@ func TestCleanStaleEntries(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// mock existing entries before cleanup
|
// The following mock conntrack flow entries `entriesBeforeCleanup` and `entriesAfterCleanup`
|
||||||
// we create 36 fake flow entries ( 3 Endpoints * 3 Protocols * ( 3 (ServiceIPs) + 1 (NodePort))
|
// represent conntrack flow entries before and after reconciler cleanup loop. Before cleanup,
|
||||||
var mockEntries []*netlink.ConntrackFlow
|
// reconciler lists the conntrack flows, receiving `entriesBeforeCleanup` and after cleanup,
|
||||||
// expectedEntries are the entries on which we will assert the cleanup logic
|
// we list the conntrack flows and assert them to match with `entriesAfterCleanup`.
|
||||||
var expectedEntries []*netlink.ConntrackFlow
|
// {entriesBeforeCleanup} - {entriesAfterCleanup} = entries cleared by conntrack reconciler
|
||||||
|
var entriesBeforeCleanup []*netlink.ConntrackFlow
|
||||||
|
// entriesBeforeCleanup - entriesAfterCleanup = entries cleared by conntrack reconciler
|
||||||
|
var entriesAfterCleanup []*netlink.ConntrackFlow
|
||||||
|
|
||||||
|
// we create 63 fake flow entries ( 3 Endpoints * 3 Protocols * ( 3 (ServiceIP:ServicePort) + 3 (ServiceIP:NonServicePort) + 1 (NodePort))
|
||||||
for _, dnatDest := range []string{testServingEndpointIP, testNonServingEndpointIP, testDeletedEndpointIP} {
|
for _, dnatDest := range []string{testServingEndpointIP, testNonServingEndpointIP, testDeletedEndpointIP} {
|
||||||
for _, proto := range []uint8{unix.IPPROTO_TCP, unix.IPPROTO_UDP, unix.IPPROTO_SCTP} {
|
for _, proto := range []uint8{unix.IPPROTO_TCP, unix.IPPROTO_UDP, unix.IPPROTO_SCTP} {
|
||||||
|
|
||||||
for _, origDest := range []string{testClusterIP, testLoadBalancerIP, testExternalIP} {
|
for _, origDest := range []string{testClusterIP, testLoadBalancerIP, testExternalIP} {
|
||||||
entry := &netlink.ConntrackFlow{
|
for _, port := range []uint16{testServicePort, testNonServicePort} {
|
||||||
FamilyType: unix.AF_INET,
|
entry := &netlink.ConntrackFlow{
|
||||||
Forward: netlink.IPTuple{
|
FamilyType: unix.AF_INET,
|
||||||
DstIP: netutils.ParseIPSloppy(origDest),
|
Forward: netlink.IPTuple{
|
||||||
Protocol: proto,
|
DstIP: netutils.ParseIPSloppy(origDest),
|
||||||
},
|
DstPort: port,
|
||||||
Reverse: netlink.IPTuple{
|
Protocol: proto,
|
||||||
Protocol: proto,
|
},
|
||||||
SrcIP: netutils.ParseIPSloppy(dnatDest),
|
Reverse: netlink.IPTuple{
|
||||||
},
|
Protocol: proto,
|
||||||
}
|
SrcIP: netutils.ParseIPSloppy(dnatDest),
|
||||||
mockEntries = append(mockEntries, entry)
|
},
|
||||||
// we do not expect deleted or non-serving UDP endpoints flows to be present after cleanup
|
}
|
||||||
if !(proto == unix.IPPROTO_UDP && (dnatDest == testNonServingEndpointIP || dnatDest == testDeletedEndpointIP)) {
|
entriesBeforeCleanup = append(entriesBeforeCleanup, entry)
|
||||||
expectedEntries = append(expectedEntries, entry)
|
if proto == unix.IPPROTO_UDP && port == testServicePort && dnatDest != testServingEndpointIP {
|
||||||
|
// we do not expect UDP entries with destination port `testServicePort` and DNATed destination
|
||||||
|
// address not an address of serving endpoint to be present after cleanup.
|
||||||
|
} else {
|
||||||
|
entriesAfterCleanup = append(entriesAfterCleanup, entry)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
entry := &netlink.ConntrackFlow{
|
entry := &netlink.ConntrackFlow{
|
||||||
FamilyType: unix.AF_INET,
|
FamilyType: unix.AF_INET,
|
||||||
Forward: netlink.IPTuple{
|
Forward: netlink.IPTuple{
|
||||||
DstPort: testNodePort,
|
DstPort: testServiceNodePort,
|
||||||
Protocol: proto,
|
Protocol: proto,
|
||||||
},
|
},
|
||||||
Reverse: netlink.IPTuple{
|
Reverse: netlink.IPTuple{
|
||||||
@ -248,50 +265,264 @@ func TestCleanStaleEntries(t *testing.T) {
|
|||||||
SrcIP: netutils.ParseIPSloppy(dnatDest),
|
SrcIP: netutils.ParseIPSloppy(dnatDest),
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
mockEntries = append(mockEntries, entry)
|
entriesBeforeCleanup = append(entriesBeforeCleanup, entry)
|
||||||
// we do not expect deleted or non-serving UDP endpoints entries to be present after cleanup
|
if proto == unix.IPPROTO_UDP && dnatDest != testServingEndpointIP {
|
||||||
if !(proto == unix.IPPROTO_UDP && (dnatDest == testNonServingEndpointIP || dnatDest == testDeletedEndpointIP)) {
|
// we do not expect UDP entries with DNATed destination address not
|
||||||
expectedEntries = append(expectedEntries, entry)
|
// an address of serving endpoint to be present after cleanup.
|
||||||
|
} else {
|
||||||
|
entriesAfterCleanup = append(entriesAfterCleanup, entry)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// add some non-DNATed mock entries which should be cleared up by reconciler
|
// add 6 non-DNATed mock entries which should be cleared up by reconciler
|
||||||
// These will exist if the proxy don't have DROP/REJECT rule for service with
|
// These will exist if the proxy don't have DROP/REJECT rule for service with
|
||||||
// no endpoints, --orig-dst and --reply-src will be same for these entries.
|
// no endpoints, --orig-dst and --reply-src will be same for these entries.
|
||||||
for _, ip := range []string{testClusterIP, testLoadBalancerIP, testExternalIP} {
|
for _, ip := range []string{testClusterIP, testLoadBalancerIP, testExternalIP} {
|
||||||
entry := &netlink.ConntrackFlow{
|
for _, port := range []uint16{testServicePort, testNonServicePort} {
|
||||||
FamilyType: unix.AF_INET,
|
entry := &netlink.ConntrackFlow{
|
||||||
Forward: netlink.IPTuple{
|
FamilyType: unix.AF_INET,
|
||||||
DstIP: netutils.ParseIPSloppy(ip),
|
Forward: netlink.IPTuple{
|
||||||
Protocol: unix.IPPROTO_UDP,
|
DstIP: netutils.ParseIPSloppy(ip),
|
||||||
},
|
DstPort: port,
|
||||||
Reverse: netlink.IPTuple{
|
Protocol: unix.IPPROTO_UDP,
|
||||||
Protocol: unix.IPPROTO_UDP,
|
},
|
||||||
SrcIP: netutils.ParseIPSloppy(ip),
|
Reverse: netlink.IPTuple{
|
||||||
},
|
Protocol: unix.IPPROTO_UDP,
|
||||||
|
SrcIP: netutils.ParseIPSloppy(ip),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
entriesBeforeCleanup = append(entriesBeforeCleanup, entry)
|
||||||
|
// we do not expect entries with destination port `testServicePort` to be
|
||||||
|
// present after cleanup.
|
||||||
|
if port != testServicePort {
|
||||||
|
entriesAfterCleanup = append(entriesAfterCleanup, entry)
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
mockEntries = append(mockEntries, entry)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
t.Logf("entries before cleanup %d after cleanup %d", len(entriesBeforeCleanup), len(entriesAfterCleanup))
|
||||||
fake := NewFake()
|
fake := NewFake()
|
||||||
fake.entries = mockEntries
|
fake.entries = entriesBeforeCleanup
|
||||||
CleanStaleEntries(fake, testIPFamily, svcPortMap, endpointsMap)
|
CleanStaleEntries(fake, testIPFamily, svcPortMap, endpointsMap)
|
||||||
|
|
||||||
actualEntries, _ := fake.ListEntries(ipFamilyMap[testIPFamily])
|
actualEntries, _ := fake.ListEntries(ipFamilyMap[testIPFamily])
|
||||||
require.Equal(t, len(expectedEntries), len(actualEntries))
|
require.Equal(t, len(entriesAfterCleanup), len(actualEntries))
|
||||||
|
|
||||||
// sort the actual flows before comparison
|
// sort the actual flows before comparison
|
||||||
sort.Slice(actualEntries, func(i, j int) bool {
|
sort.Slice(actualEntries, func(i, j int) bool {
|
||||||
return actualEntries[i].String() < actualEntries[j].String()
|
return actualEntries[i].String() < actualEntries[j].String()
|
||||||
})
|
})
|
||||||
// sort the expected flows before comparison
|
// sort the expected flows before comparison
|
||||||
sort.Slice(expectedEntries, func(i, j int) bool {
|
sort.Slice(entriesAfterCleanup, func(i, j int) bool {
|
||||||
return expectedEntries[i].String() < expectedEntries[j].String()
|
return entriesAfterCleanup[i].String() < entriesAfterCleanup[j].String()
|
||||||
})
|
})
|
||||||
|
|
||||||
for i := 0; i < len(expectedEntries); i++ {
|
if diff := cmp.Diff(entriesAfterCleanup, actualEntries); len(diff) > 0 {
|
||||||
require.Equal(t, expectedEntries[i], actualEntries[i])
|
t.Errorf("unexpected entries after cleanup: %s", diff)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPerformanceCleanStaleEntries(t *testing.T) {
|
||||||
|
sct := proxy.NewServiceChangeTracker(nil, v1.IPv4Protocol, nil, nil)
|
||||||
|
svc := &v1.Service{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Name: testServiceName,
|
||||||
|
Namespace: testServiceNamespace,
|
||||||
|
},
|
||||||
|
Spec: v1.ServiceSpec{
|
||||||
|
ClusterIP: testClusterIP,
|
||||||
|
ExternalIPs: []string{testExternalIP},
|
||||||
|
Ports: []v1.ServicePort{
|
||||||
|
{
|
||||||
|
Name: "test-udp",
|
||||||
|
Port: testServicePort,
|
||||||
|
Protocol: v1.ProtocolUDP,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Status: v1.ServiceStatus{
|
||||||
|
LoadBalancer: v1.LoadBalancerStatus{
|
||||||
|
Ingress: []v1.LoadBalancerIngress{{
|
||||||
|
IP: testLoadBalancerIP,
|
||||||
|
}},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
sct.Update(nil, svc)
|
||||||
|
svcPortMap := make(proxy.ServicePortMap)
|
||||||
|
_ = svcPortMap.Update(sct)
|
||||||
|
|
||||||
|
ect := proxy.NewEndpointsChangeTracker("test-worker", nil, v1.IPv4Protocol, nil, nil)
|
||||||
|
eps := &discovery.EndpointSlice{
|
||||||
|
TypeMeta: metav1.TypeMeta{},
|
||||||
|
AddressType: discovery.AddressTypeIPv4,
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Name: fmt.Sprintf("%s-0", testServiceName),
|
||||||
|
Namespace: testServiceNamespace,
|
||||||
|
Labels: map[string]string{discovery.LabelServiceName: testServiceName},
|
||||||
|
},
|
||||||
|
Endpoints: []discovery.Endpoint{
|
||||||
|
{
|
||||||
|
Addresses: []string{testServingEndpointIP},
|
||||||
|
Conditions: discovery.EndpointConditions{Serving: ptr.To(true)},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Addresses: []string{testNonServingEndpointIP},
|
||||||
|
Conditions: discovery.EndpointConditions{Serving: ptr.To(false)},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Ports: []discovery.EndpointPort{
|
||||||
|
{
|
||||||
|
Name: ptr.To("test-udp"),
|
||||||
|
Port: ptr.To(int32(testServicePort)),
|
||||||
|
Protocol: ptr.To(v1.ProtocolUDP),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
ect.EndpointSliceUpdate(eps, false)
|
||||||
|
endpointsMap := make(proxy.EndpointsMap)
|
||||||
|
_ = endpointsMap.Update(ect)
|
||||||
|
|
||||||
|
genConntrackEntry := func(dstPort uint16, revSrc string) *netlink.ConntrackFlow {
|
||||||
|
return &netlink.ConntrackFlow{
|
||||||
|
FamilyType: unix.AF_INET,
|
||||||
|
Forward: netlink.IPTuple{
|
||||||
|
DstIP: netutils.ParseIPSloppy(testExternalIP),
|
||||||
|
DstPort: dstPort,
|
||||||
|
Protocol: unix.IPPROTO_UDP,
|
||||||
|
},
|
||||||
|
Reverse: netlink.IPTuple{
|
||||||
|
Protocol: unix.IPPROTO_UDP,
|
||||||
|
SrcIP: netutils.ParseIPSloppy(revSrc),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fake := NewFake()
|
||||||
|
// 1 valid entry
|
||||||
|
fake.entries = append(fake.entries, genConntrackEntry(uint16(testServicePort), testServingEndpointIP))
|
||||||
|
expectedEntries := 1
|
||||||
|
// 1 stale entry
|
||||||
|
fake.entries = append(fake.entries, genConntrackEntry(uint16(testServicePort), testDeletedEndpointIP))
|
||||||
|
expectedDeleted := 1
|
||||||
|
// 1 M to the Service IP with random ports
|
||||||
|
for i := 0; i < 1000*1000; i++ {
|
||||||
|
port := uint16(rand.Intn(65535))
|
||||||
|
if port == testServicePort {
|
||||||
|
expectedDeleted++
|
||||||
|
} else {
|
||||||
|
expectedEntries++
|
||||||
|
}
|
||||||
|
fake.entries = append(fake.entries, genConntrackEntry(port, testDeletedEndpointIP))
|
||||||
|
}
|
||||||
|
|
||||||
|
CleanStaleEntries(fake, testIPFamily, svcPortMap, endpointsMap)
|
||||||
|
actualEntries, _ := fake.ListEntries(ipFamilyMap[testIPFamily])
|
||||||
|
if len(actualEntries) != expectedEntries {
|
||||||
|
t.Errorf("unexpected number of entries, got %d expected %d", len(actualEntries), expectedEntries)
|
||||||
|
}
|
||||||
|
// expected conntrack entries
|
||||||
|
// 1 for CleanStaleEntries + 1 for ListEntries + 1 for ConntrackDeleteFilters to dump the conntrack table
|
||||||
|
// n for the expected deleted stale entries
|
||||||
|
t.Logf("expected deleted %d", expectedDeleted)
|
||||||
|
if fake.netlinkRequests != 3+expectedDeleted {
|
||||||
|
t.Errorf("expected %d netlink requests, got %d", 3+expectedDeleted, fake.netlinkRequests)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestServiceWithoutEndpoints(t *testing.T) {
|
||||||
|
sct := proxy.NewServiceChangeTracker(nil, v1.IPv4Protocol, nil, nil)
|
||||||
|
svc := &v1.Service{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Name: testServiceName,
|
||||||
|
Namespace: testServiceNamespace,
|
||||||
|
},
|
||||||
|
Spec: v1.ServiceSpec{
|
||||||
|
ClusterIP: testClusterIP,
|
||||||
|
ExternalIPs: []string{testExternalIP},
|
||||||
|
Ports: []v1.ServicePort{
|
||||||
|
{
|
||||||
|
Name: "test-udp",
|
||||||
|
Port: testServicePort,
|
||||||
|
Protocol: v1.ProtocolUDP,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Status: v1.ServiceStatus{
|
||||||
|
LoadBalancer: v1.LoadBalancerStatus{
|
||||||
|
Ingress: []v1.LoadBalancerIngress{{
|
||||||
|
IP: testLoadBalancerIP,
|
||||||
|
}},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
sct.Update(nil, svc)
|
||||||
|
svcPortMap := make(proxy.ServicePortMap)
|
||||||
|
_ = svcPortMap.Update(sct)
|
||||||
|
|
||||||
|
ect := proxy.NewEndpointsChangeTracker("test-worker", nil, v1.IPv4Protocol, nil, nil)
|
||||||
|
eps := &discovery.EndpointSlice{
|
||||||
|
TypeMeta: metav1.TypeMeta{},
|
||||||
|
AddressType: discovery.AddressTypeIPv4,
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Name: fmt.Sprintf("%s-0", testServiceName),
|
||||||
|
Namespace: testServiceNamespace,
|
||||||
|
Labels: map[string]string{discovery.LabelServiceName: "non-existing-service"},
|
||||||
|
},
|
||||||
|
Endpoints: []discovery.Endpoint{
|
||||||
|
{
|
||||||
|
Addresses: []string{testServingEndpointIP},
|
||||||
|
Conditions: discovery.EndpointConditions{Serving: ptr.To(true)},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Addresses: []string{testNonServingEndpointIP},
|
||||||
|
Conditions: discovery.EndpointConditions{Serving: ptr.To(false)},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Ports: []discovery.EndpointPort{
|
||||||
|
{
|
||||||
|
Name: ptr.To("test-udp"),
|
||||||
|
Port: ptr.To(int32(testServicePort)),
|
||||||
|
Protocol: ptr.To(v1.ProtocolUDP),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
ect.EndpointSliceUpdate(eps, false)
|
||||||
|
endpointsMap := make(proxy.EndpointsMap)
|
||||||
|
_ = endpointsMap.Update(ect)
|
||||||
|
|
||||||
|
genConntrackEntry := func(dstPort uint16, revSrc string) *netlink.ConntrackFlow {
|
||||||
|
return &netlink.ConntrackFlow{
|
||||||
|
FamilyType: unix.AF_INET,
|
||||||
|
Forward: netlink.IPTuple{
|
||||||
|
DstIP: netutils.ParseIPSloppy(testExternalIP),
|
||||||
|
DstPort: dstPort,
|
||||||
|
Protocol: unix.IPPROTO_UDP,
|
||||||
|
},
|
||||||
|
Reverse: netlink.IPTuple{
|
||||||
|
Protocol: unix.IPPROTO_UDP,
|
||||||
|
SrcIP: netutils.ParseIPSloppy(revSrc),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fake := NewFake()
|
||||||
|
// 1 valid entry
|
||||||
|
fake.entries = append(fake.entries, genConntrackEntry(uint16(testServicePort), testServingEndpointIP))
|
||||||
|
// 1 stale entry
|
||||||
|
fake.entries = append(fake.entries, genConntrackEntry(uint16(testServicePort), testDeletedEndpointIP))
|
||||||
|
|
||||||
|
CleanStaleEntries(fake, testIPFamily, svcPortMap, endpointsMap)
|
||||||
|
actualEntries, _ := fake.ListEntries(ipFamilyMap[testIPFamily])
|
||||||
|
if len(actualEntries) != 2 {
|
||||||
|
t.Errorf("unexpected number of entries, got %d expected %d", len(actualEntries), 2)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -300,6 +531,7 @@ func TestFilterForNAT(t *testing.T) {
|
|||||||
name string
|
name string
|
||||||
orig string
|
orig string
|
||||||
dest string
|
dest string
|
||||||
|
dstPort uint16
|
||||||
protocol v1.Protocol
|
protocol v1.Protocol
|
||||||
expectedFilter *conntrackFilter
|
expectedFilter *conntrackFilter
|
||||||
}{
|
}{
|
||||||
@ -329,7 +561,7 @@ func TestFilterForNAT(t *testing.T) {
|
|||||||
|
|
||||||
for _, tc := range testCases {
|
for _, tc := range testCases {
|
||||||
t.Run(tc.name, func(t *testing.T) {
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
require.Equal(t, tc.expectedFilter, filterForNAT(tc.orig, tc.dest, tc.protocol))
|
require.Equal(t, tc.expectedFilter, filterForIPPortNAT(tc.orig, tc.dest, tc.dstPort, tc.protocol))
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -25,7 +25,8 @@ import (
|
|||||||
|
|
||||||
// FakeInterface implements Interface by just recording entries that have been cleared.
|
// FakeInterface implements Interface by just recording entries that have been cleared.
|
||||||
type FakeInterface struct {
|
type FakeInterface struct {
|
||||||
entries []*netlink.ConntrackFlow
|
entries []*netlink.ConntrackFlow
|
||||||
|
netlinkRequests int // try to get the estimated number of netlink request
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ Interface = &FakeInterface{}
|
var _ Interface = &FakeInterface{}
|
||||||
@ -37,18 +38,30 @@ func NewFake() *FakeInterface {
|
|||||||
|
|
||||||
// ListEntries is part of Interface
|
// ListEntries is part of Interface
|
||||||
func (fake *FakeInterface) ListEntries(_ uint8) ([]*netlink.ConntrackFlow, error) {
|
func (fake *FakeInterface) ListEntries(_ uint8) ([]*netlink.ConntrackFlow, error) {
|
||||||
return fake.entries, nil
|
entries := make([]*netlink.ConntrackFlow, len(fake.entries))
|
||||||
|
copy(entries, fake.entries)
|
||||||
|
// 1 netlink request to dump the table
|
||||||
|
// https://github.com/vishvananda/netlink/blob/0af32151e72b990c271ef6268e8aadb7e015f2bd/conntrack_linux.go#L93-L94
|
||||||
|
fake.netlinkRequests++
|
||||||
|
return entries, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// ClearEntries is part of Interface
|
// ClearEntries is part of Interface
|
||||||
func (fake *FakeInterface) ClearEntries(_ uint8, filters ...netlink.CustomConntrackFilter) (int, error) {
|
func (fake *FakeInterface) ClearEntries(_ uint8, filters ...netlink.CustomConntrackFilter) (int, error) {
|
||||||
var flows []*netlink.ConntrackFlow
|
var flows []*netlink.ConntrackFlow
|
||||||
before := len(fake.entries)
|
before := len(fake.entries)
|
||||||
|
// 1 netlink request to dump the table
|
||||||
|
// https://github.com/vishvananda/netlink/blob/0af32151e72b990c271ef6268e8aadb7e015f2bd/conntrack_linux.go#L163
|
||||||
|
fake.netlinkRequests++
|
||||||
|
|
||||||
for _, flow := range fake.entries {
|
for _, flow := range fake.entries {
|
||||||
var matched bool
|
var matched bool
|
||||||
for _, filter := range filters {
|
for _, filter := range filters {
|
||||||
matched = filter.MatchConntrackFlow(flow)
|
matched = filter.MatchConntrackFlow(flow)
|
||||||
if matched {
|
if matched {
|
||||||
|
// 1 netlink request to delete the flow
|
||||||
|
// https://github.com/vishvananda/netlink/blob/0af32151e72b990c271ef6268e8aadb7e015f2bd/conntrack_linux.go#L182
|
||||||
|
fake.netlinkRequests++
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user