mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-05 10:19:50 +00:00
proxy/conntrack: reconciler
Signed-off-by: Daman Arora <aroradaman@gmail.com>
This commit is contained in:
parent
ba3940c2e0
commit
1ad8880c0f
@ -20,6 +20,8 @@ limitations under the License.
|
|||||||
package conntrack
|
package conntrack
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/vishvananda/netlink"
|
"github.com/vishvananda/netlink"
|
||||||
"golang.org/x/sys/unix"
|
"golang.org/x/sys/unix"
|
||||||
|
|
||||||
@ -27,88 +29,96 @@ import (
|
|||||||
"k8s.io/apimachinery/pkg/util/sets"
|
"k8s.io/apimachinery/pkg/util/sets"
|
||||||
"k8s.io/klog/v2"
|
"k8s.io/klog/v2"
|
||||||
"k8s.io/kubernetes/pkg/proxy"
|
"k8s.io/kubernetes/pkg/proxy"
|
||||||
proxyutil "k8s.io/kubernetes/pkg/proxy/util"
|
|
||||||
netutils "k8s.io/utils/net"
|
netutils "k8s.io/utils/net"
|
||||||
)
|
)
|
||||||
|
|
||||||
// CleanStaleEntries takes care of flushing stale conntrack entries for services and endpoints.
|
// CleanStaleEntries scans conntrack table and removes any entries
|
||||||
func CleanStaleEntries(ct Interface, ipFamily v1.IPFamily, svcPortMap proxy.ServicePortMap,
|
// for a service that do not correspond to a serving endpoint.
|
||||||
serviceUpdateResult proxy.UpdateServiceMapResult, endpointsUpdateResult proxy.UpdateEndpointsMapResult) {
|
func CleanStaleEntries(ct Interface, ipFamily v1.IPFamily,
|
||||||
deleteStaleServiceConntrackEntries(ct, ipFamily, svcPortMap, serviceUpdateResult, endpointsUpdateResult)
|
svcPortMap proxy.ServicePortMap, endpointsMap proxy.EndpointsMap) {
|
||||||
deleteStaleEndpointConntrackEntries(ct, ipFamily, svcPortMap, endpointsUpdateResult)
|
|
||||||
}
|
start := time.Now()
|
||||||
|
klog.V(4).InfoS("Started to reconcile conntrack entries", "ipFamily", ipFamily)
|
||||||
|
|
||||||
|
entries, err := ct.ListEntries(ipFamilyMap[ipFamily])
|
||||||
|
if err != nil {
|
||||||
|
klog.ErrorS(err, "Failed to list conntrack entries")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// serviceIPEndpointIPs maps service IPs (ClusterIP, LoadBalancerIPs and ExternalIPs)
|
||||||
|
// to the set of serving endpoint IPs.
|
||||||
|
serviceIPEndpointIPs := make(map[string]sets.Set[string])
|
||||||
|
// serviceNodePortEndpointIPs maps service NodePort to the set of serving endpoint IPs.
|
||||||
|
serviceNodePortEndpointIPs := make(map[int]sets.Set[string])
|
||||||
|
|
||||||
|
for svcName, svc := range svcPortMap {
|
||||||
|
// we are only interested in UDP services
|
||||||
|
if svc.Protocol() != v1.ProtocolUDP {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
endpointIPs := sets.New[string]()
|
||||||
|
for _, endpoint := range endpointsMap[svcName] {
|
||||||
|
// We need to remove all the conntrack entries for a Service (IP or NodePort)
|
||||||
|
// that are not pointing to a serving endpoint.
|
||||||
|
// We map all the serving endpoint IPs to the service and clear all the conntrack
|
||||||
|
// entries which are destined for the service and are not DNATed to these endpoints.
|
||||||
|
// Changes to the service should not affect existing flows, so we do not take
|
||||||
|
// traffic policies, topology, or terminating status of the service into account.
|
||||||
|
// This ensures that the behavior of UDP services remains consistent with TCP
|
||||||
|
// services.
|
||||||
|
if endpoint.IsServing() {
|
||||||
|
endpointIPs.Insert(endpoint.IP())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
serviceIPEndpointIPs[svc.ClusterIP().String()] = endpointIPs
|
||||||
|
for _, loadBalancerIP := range svc.LoadBalancerVIPs() {
|
||||||
|
serviceIPEndpointIPs[loadBalancerIP.String()] = endpointIPs
|
||||||
|
}
|
||||||
|
for _, externalIP := range svc.ExternalIPs() {
|
||||||
|
serviceIPEndpointIPs[externalIP.String()] = endpointIPs
|
||||||
|
}
|
||||||
|
if svc.NodePort() != 0 {
|
||||||
|
serviceNodePortEndpointIPs[svc.NodePort()] = endpointIPs
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// deleteStaleServiceConntrackEntries takes care of flushing stale conntrack entries related
|
|
||||||
// to UDP Service IPs. When a service has no endpoints and we drop traffic to it, conntrack
|
|
||||||
// may create "black hole" entries for that IP+port. When the service gets endpoints we
|
|
||||||
// need to delete those entries so further traffic doesn't get dropped.
|
|
||||||
func deleteStaleServiceConntrackEntries(ct Interface, ipFamily v1.IPFamily, svcPortMap proxy.ServicePortMap, serviceUpdateResult proxy.UpdateServiceMapResult, endpointsUpdateResult proxy.UpdateEndpointsMapResult) {
|
|
||||||
var filters []netlink.CustomConntrackFilter
|
var filters []netlink.CustomConntrackFilter
|
||||||
conntrackCleanupServiceIPs := serviceUpdateResult.DeletedUDPClusterIPs
|
for _, entry := range entries {
|
||||||
conntrackCleanupServiceNodePorts := sets.New[int]()
|
// we only deal with UDP protocol entries
|
||||||
|
if entry.Forward.Protocol != unix.IPPROTO_UDP {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
// merge newly active services gathered from endpointsUpdateResult
|
origDst := entry.Forward.DstIP.String()
|
||||||
// a UDP service that changes from 0 to non-0 endpoints is newly active.
|
origPortDst := int(entry.Forward.DstPort)
|
||||||
for _, svcPortName := range endpointsUpdateResult.NewlyActiveUDPServices {
|
replySrc := entry.Reverse.SrcIP.String()
|
||||||
if svcInfo, ok := svcPortMap[svcPortName]; ok {
|
|
||||||
klog.V(4).InfoS("Newly-active UDP service may have stale conntrack entries", "servicePortName", svcPortName)
|
// if the original destination (--orig-dst) of the entry is service IP (ClusterIP,
|
||||||
conntrackCleanupServiceIPs.Insert(svcInfo.ClusterIP().String())
|
// LoadBalancerIPs or ExternalIPs) and the reply source (--reply-src) is not IP of
|
||||||
for _, extIP := range svcInfo.ExternalIPs() {
|
// any serving endpoint, we clear the entry.
|
||||||
conntrackCleanupServiceIPs.Insert(extIP.String())
|
if _, ok := serviceIPEndpointIPs[origDst]; ok {
|
||||||
}
|
if !serviceIPEndpointIPs[origDst].Has(replySrc) {
|
||||||
for _, lbIP := range svcInfo.LoadBalancerVIPs() {
|
filters = append(filters, filterForNAT(origDst, replySrc, v1.ProtocolUDP))
|
||||||
conntrackCleanupServiceIPs.Insert(lbIP.String())
|
|
||||||
}
|
|
||||||
nodePort := svcInfo.NodePort()
|
|
||||||
if svcInfo.Protocol() == v1.ProtocolUDP && nodePort != 0 {
|
|
||||||
conntrackCleanupServiceNodePorts.Insert(nodePort)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
klog.V(4).InfoS("Deleting conntrack stale entries for services", "IPs", conntrackCleanupServiceIPs.UnsortedList())
|
// if the original port destination (--orig-port-dst) of the flow is service
|
||||||
for _, svcIP := range conntrackCleanupServiceIPs.UnsortedList() {
|
// NodePort and the reply source (--reply-src) is not IP of any serving endpoint,
|
||||||
filters = append(filters, filterForIP(svcIP, v1.ProtocolUDP))
|
// we clear the entry.
|
||||||
}
|
if _, ok := serviceNodePortEndpointIPs[origPortDst]; ok {
|
||||||
klog.V(4).InfoS("Deleting conntrack stale entries for services", "nodePorts", conntrackCleanupServiceNodePorts.UnsortedList())
|
if !serviceNodePortEndpointIPs[origPortDst].Has(replySrc) {
|
||||||
for _, nodePort := range conntrackCleanupServiceNodePorts.UnsortedList() {
|
filters = append(filters, filterForPortNAT(replySrc, origPortDst, v1.ProtocolUDP))
|
||||||
filters = append(filters, filterForPort(nodePort, v1.ProtocolUDP))
|
|
||||||
}
|
|
||||||
|
|
||||||
if n, err := ct.ClearEntries(ipFamilyMap[ipFamily], filters...); err != nil {
|
|
||||||
klog.ErrorS(err, "Failed to delete stale service connections")
|
|
||||||
} else {
|
|
||||||
klog.V(4).InfoS("Deleted conntrack stale entries for services", "count", n)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// deleteStaleEndpointConntrackEntries takes care of flushing stale conntrack entries related
|
|
||||||
// to UDP endpoints. After a UDP endpoint is removed we must flush any conntrack entries
|
|
||||||
// for it so that if the same client keeps sending, the packets will get routed to a new endpoint.
|
|
||||||
func deleteStaleEndpointConntrackEntries(ct Interface, ipFamily v1.IPFamily, svcPortMap proxy.ServicePortMap, endpointsUpdateResult proxy.UpdateEndpointsMapResult) {
|
|
||||||
var filters []netlink.CustomConntrackFilter
|
|
||||||
for _, epSvcPair := range endpointsUpdateResult.DeletedUDPEndpoints {
|
|
||||||
if svcInfo, ok := svcPortMap[epSvcPair.ServicePortName]; ok {
|
|
||||||
endpointIP := proxyutil.IPPart(epSvcPair.Endpoint)
|
|
||||||
nodePort := svcInfo.NodePort()
|
|
||||||
if nodePort != 0 {
|
|
||||||
filters = append(filters, filterForPortNAT(endpointIP, nodePort, v1.ProtocolUDP))
|
|
||||||
|
|
||||||
}
|
|
||||||
filters = append(filters, filterForNAT(svcInfo.ClusterIP().String(), endpointIP, v1.ProtocolUDP))
|
|
||||||
for _, extIP := range svcInfo.ExternalIPs() {
|
|
||||||
filters = append(filters, filterForNAT(extIP.String(), endpointIP, v1.ProtocolUDP))
|
|
||||||
}
|
|
||||||
for _, lbIP := range svcInfo.LoadBalancerVIPs() {
|
|
||||||
filters = append(filters, filterForNAT(lbIP.String(), endpointIP, v1.ProtocolUDP))
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if n, err := ct.ClearEntries(ipFamilyMap[ipFamily], filters...); err != nil {
|
if n, err := ct.ClearEntries(ipFamilyMap[ipFamily], filters...); err != nil {
|
||||||
klog.ErrorS(err, "Failed to delete stale endpoint connections")
|
klog.ErrorS(err, "Failed to clear all conntrack entries", "ipFamily", ipFamily, "entriesDeleted", n, "took", time.Since(start))
|
||||||
} else {
|
} else {
|
||||||
klog.V(4).InfoS("Deleted conntrack stale entries for endpoints", "count", n)
|
klog.V(4).InfoS("Finished reconciling conntrack entries", "ipFamily", ipFamily, "entriesDeleted", n, "took", time.Since(start))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -126,30 +136,6 @@ var protocolMap = map[v1.Protocol]uint8{
|
|||||||
v1.ProtocolSCTP: unix.IPPROTO_SCTP,
|
v1.ProtocolSCTP: unix.IPPROTO_SCTP,
|
||||||
}
|
}
|
||||||
|
|
||||||
// filterForIP returns *conntrackFilter to delete the conntrack entries for connections
|
|
||||||
// specified by the destination IP (original direction).
|
|
||||||
func filterForIP(ip string, protocol v1.Protocol) *conntrackFilter {
|
|
||||||
klog.V(4).InfoS("Adding conntrack filter for cleanup", "org-dst", ip, "protocol", protocol)
|
|
||||||
return &conntrackFilter{
|
|
||||||
protocol: protocolMap[protocol],
|
|
||||||
original: &connectionTuple{
|
|
||||||
dstIP: netutils.ParseIPSloppy(ip),
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// filterForPort returns *conntrackFilter to delete the conntrack entries for connections
|
|
||||||
// specified by the destination Port (original direction).
|
|
||||||
func filterForPort(port int, protocol v1.Protocol) *conntrackFilter {
|
|
||||||
klog.V(4).InfoS("Adding conntrack filter for cleanup", "org-port-dst", port, "protocol", protocol)
|
|
||||||
return &conntrackFilter{
|
|
||||||
protocol: protocolMap[protocol],
|
|
||||||
original: &connectionTuple{
|
|
||||||
dstPort: uint16(port),
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// filterForNAT returns *conntrackFilter to delete the conntrack entries for connections
|
// filterForNAT returns *conntrackFilter to delete the conntrack entries for connections
|
||||||
// specified by the destination IP (original direction) and source IP (reply direction).
|
// specified by the destination IP (original direction) and source IP (reply direction).
|
||||||
func filterForNAT(origin, dest string, protocol v1.Protocol) *conntrackFilter {
|
func filterForNAT(origin, dest string, protocol v1.Protocol) *conntrackFilter {
|
||||||
|
@ -20,62 +20,75 @@ limitations under the License.
|
|||||||
package conntrack
|
package conntrack
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"net"
|
"fmt"
|
||||||
"reflect"
|
"sort"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
"github.com/vishvananda/netlink"
|
"github.com/vishvananda/netlink"
|
||||||
|
"golang.org/x/sys/unix"
|
||||||
|
|
||||||
v1 "k8s.io/api/core/v1"
|
v1 "k8s.io/api/core/v1"
|
||||||
|
discovery "k8s.io/api/discovery/v1"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/types"
|
"k8s.io/apimachinery/pkg/types"
|
||||||
"k8s.io/apimachinery/pkg/util/sets"
|
|
||||||
"k8s.io/kubernetes/pkg/proxy"
|
"k8s.io/kubernetes/pkg/proxy"
|
||||||
netutils "k8s.io/utils/net"
|
netutils "k8s.io/utils/net"
|
||||||
|
"k8s.io/utils/ptr"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
testServiceName = "cleanup-test"
|
||||||
|
testServiceNamespace = "test"
|
||||||
|
|
||||||
testIPFamily = v1.IPv4Protocol
|
testIPFamily = v1.IPv4Protocol
|
||||||
testClusterIP = "172.30.1.1"
|
testClusterIP = "172.30.1.1"
|
||||||
testExternalIP = "192.168.99.100"
|
testExternalIP = "192.168.99.100"
|
||||||
testLoadBalancerIP = "1.2.3.4"
|
testLoadBalancerIP = "1.2.3.4"
|
||||||
|
|
||||||
testEndpointIP = "10.240.0.4"
|
testServingEndpointIP = "10.240.0.4"
|
||||||
|
testNonServingEndpointIP = "10.240.1.5"
|
||||||
|
testDeletedEndpointIP = "10.240.2.6"
|
||||||
|
|
||||||
testPort = 53
|
testPort = 8000
|
||||||
testNodePort = 5353
|
testNodePort = 32000
|
||||||
testEndpointPort = "5300"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestCleanStaleEntries(t *testing.T) {
|
func TestCleanStaleEntries(t *testing.T) {
|
||||||
// We need to construct a proxy.ServicePortMap to pass to CleanStaleEntries.
|
// We need to construct proxy.ServicePortMap and proxy.EndpointsMap to pass to
|
||||||
// ServicePortMap is just map[string]proxy.ServicePort, but there are no public
|
// CleanStaleEntries. ServicePortMap and EndpointsMap are just maps, but there are
|
||||||
// constructors for any implementation of proxy.ServicePort, so we have to either
|
// no public constructors for any implementation of proxy.ServicePort and
|
||||||
// provide our own implementation of that interface, or else use a
|
// proxy.EndpointsMap, so we have to either provide our own implementation of that
|
||||||
// proxy.ServiceChangeTracker to construct them and fill in the map for us.
|
// interface, or else use a proxy.ServiceChangeTracker and proxy.NewEndpointsChangeTracker
|
||||||
|
// to construct them and fill in the maps for us.
|
||||||
|
|
||||||
sct := proxy.NewServiceChangeTracker(nil, v1.IPv4Protocol, nil, nil)
|
sct := proxy.NewServiceChangeTracker(nil, v1.IPv4Protocol, nil, nil)
|
||||||
svc := &v1.Service{
|
svc := &v1.Service{
|
||||||
ObjectMeta: metav1.ObjectMeta{
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
Name: "cleanup-test",
|
Name: testServiceName,
|
||||||
Namespace: "test",
|
Namespace: testServiceNamespace,
|
||||||
},
|
},
|
||||||
Spec: v1.ServiceSpec{
|
Spec: v1.ServiceSpec{
|
||||||
ClusterIP: testClusterIP,
|
ClusterIP: testClusterIP,
|
||||||
ExternalIPs: []string{testExternalIP},
|
ExternalIPs: []string{testExternalIP},
|
||||||
Ports: []v1.ServicePort{
|
Ports: []v1.ServicePort{
|
||||||
{
|
{
|
||||||
Name: "dns-tcp",
|
Name: "test-tcp",
|
||||||
Port: testPort,
|
Port: testPort,
|
||||||
Protocol: v1.ProtocolTCP,
|
Protocol: v1.ProtocolTCP,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
Name: "dns-udp",
|
Name: "test-udp",
|
||||||
Port: testPort,
|
Port: testPort,
|
||||||
NodePort: testNodePort,
|
NodePort: testNodePort,
|
||||||
Protocol: v1.ProtocolUDP,
|
Protocol: v1.ProtocolUDP,
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
Name: "test-sctp",
|
||||||
|
Port: testPort,
|
||||||
|
NodePort: testNodePort,
|
||||||
|
Protocol: v1.ProtocolSCTP,
|
||||||
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
Status: v1.ServiceStatus{
|
Status: v1.ServiceStatus{
|
||||||
@ -86,14 +99,52 @@ func TestCleanStaleEntries(t *testing.T) {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
sct.Update(nil, svc)
|
|
||||||
|
|
||||||
|
sct.Update(nil, svc)
|
||||||
svcPortMap := make(proxy.ServicePortMap)
|
svcPortMap := make(proxy.ServicePortMap)
|
||||||
_ = svcPortMap.Update(sct)
|
_ = svcPortMap.Update(sct)
|
||||||
|
|
||||||
// (At this point we are done with sct, and in particular, we don't use sct to
|
ect := proxy.NewEndpointsChangeTracker("test-worker", nil, v1.IPv4Protocol, nil, nil)
|
||||||
// construct UpdateServiceMapResults, because pkg/proxy already has its own tests
|
eps := &discovery.EndpointSlice{
|
||||||
// for that. Also, svcPortMap is read-only from this point on.)
|
TypeMeta: metav1.TypeMeta{},
|
||||||
|
AddressType: discovery.AddressTypeIPv4,
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Name: fmt.Sprintf("%s-0", testServiceName),
|
||||||
|
Namespace: testServiceNamespace,
|
||||||
|
Labels: map[string]string{discovery.LabelServiceName: testServiceName},
|
||||||
|
},
|
||||||
|
Endpoints: []discovery.Endpoint{
|
||||||
|
{
|
||||||
|
Addresses: []string{testServingEndpointIP},
|
||||||
|
Conditions: discovery.EndpointConditions{Serving: ptr.To(true)},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Addresses: []string{testNonServingEndpointIP},
|
||||||
|
Conditions: discovery.EndpointConditions{Serving: ptr.To(false)},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Ports: []discovery.EndpointPort{
|
||||||
|
{
|
||||||
|
Name: ptr.To("test-tcp"),
|
||||||
|
Port: ptr.To(int32(testPort)),
|
||||||
|
Protocol: ptr.To(v1.ProtocolTCP),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: ptr.To("test-udp"),
|
||||||
|
Port: ptr.To(int32(testPort)),
|
||||||
|
Protocol: ptr.To(v1.ProtocolUDP),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: ptr.To("test-sctp"),
|
||||||
|
Port: ptr.To(int32(testPort)),
|
||||||
|
Protocol: ptr.To(v1.ProtocolSCTP),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
ect.EndpointSliceUpdate(eps, false)
|
||||||
|
endpointsMap := make(proxy.EndpointsMap)
|
||||||
|
_ = endpointsMap.Update(ect)
|
||||||
|
|
||||||
tcpPortName := proxy.ServicePortName{
|
tcpPortName := proxy.ServicePortName{
|
||||||
NamespacedName: types.NamespacedName{
|
NamespacedName: types.NamespacedName{
|
||||||
@ -113,227 +164,134 @@ func TestCleanStaleEntries(t *testing.T) {
|
|||||||
Protocol: svc.Spec.Ports[1].Protocol,
|
Protocol: svc.Spec.Ports[1].Protocol,
|
||||||
}
|
}
|
||||||
|
|
||||||
unknownPortName := udpPortName
|
sctpPortName := proxy.ServicePortName{
|
||||||
unknownPortName.Namespace = "unknown"
|
NamespacedName: types.NamespacedName{
|
||||||
|
Namespace: svc.Namespace,
|
||||||
|
Name: svc.Name,
|
||||||
|
},
|
||||||
|
Port: svc.Spec.Ports[2].Name,
|
||||||
|
Protocol: svc.Spec.Ports[2].Protocol,
|
||||||
|
}
|
||||||
|
|
||||||
// Sanity-check to make sure we constructed the map correctly
|
// Sanity-check to make sure we constructed the ServicePortMap correctly
|
||||||
if len(svcPortMap) != 2 {
|
if len(svcPortMap) != 3 {
|
||||||
t.Fatalf("expected svcPortMap to have 2 entries, got %+v", svcPortMap)
|
t.Fatalf("expected svcPortMap to have 2 entries, got %+v", svcPortMap)
|
||||||
}
|
}
|
||||||
servicePort := svcPortMap[tcpPortName]
|
servicePort := svcPortMap[tcpPortName]
|
||||||
if servicePort == nil || servicePort.String() != "172.30.1.1:53/TCP" {
|
if servicePort == nil || servicePort.String() != "172.30.1.1:8000/TCP" {
|
||||||
t.Fatalf("expected svcPortMap[%q] to be \"172.30.1.1:53/TCP\", got %q", tcpPortName.String(), servicePort.String())
|
t.Fatalf("expected svcPortMap[%q] to be \"172.30.1.1:8000/TCP\", got %q", tcpPortName.String(), servicePort.String())
|
||||||
}
|
}
|
||||||
servicePort = svcPortMap[udpPortName]
|
servicePort = svcPortMap[udpPortName]
|
||||||
if servicePort == nil || servicePort.String() != "172.30.1.1:53/UDP" {
|
if servicePort == nil || servicePort.String() != "172.30.1.1:8000/UDP" {
|
||||||
t.Fatalf("expected svcPortMap[%q] to be \"172.30.1.1:53/UDP\", got %q", udpPortName.String(), servicePort.String())
|
t.Fatalf("expected svcPortMap[%q] to be \"172.30.1.1:8000/UDP\", got %q", udpPortName.String(), servicePort.String())
|
||||||
|
}
|
||||||
|
servicePort = svcPortMap[sctpPortName]
|
||||||
|
if servicePort == nil || servicePort.String() != "172.30.1.1:8000/SCTP" {
|
||||||
|
t.Fatalf("expected svcPortMap[%q] to be \"172.30.1.1:8000/SCTP\", got %q", sctpPortName.String(), servicePort.String())
|
||||||
}
|
}
|
||||||
|
|
||||||
testCases := []struct {
|
// Sanity-check to make sure we constructed the EndpointsMap map correctly
|
||||||
description string
|
if len(endpointsMap) != 3 {
|
||||||
|
t.Fatalf("expected endpointsMap to have 3 entries, got %+v", endpointsMap)
|
||||||
serviceUpdates proxy.UpdateServiceMapResult
|
}
|
||||||
endpointsUpdates proxy.UpdateEndpointsMapResult
|
for _, svcPortName := range []proxy.ServicePortName{tcpPortName, udpPortName, sctpPortName} {
|
||||||
|
if len(endpointsMap[svcPortName]) != 2 {
|
||||||
result FakeInterface
|
t.Fatalf("expected endpointsMap[%q] to have 2 entries, got %+v", svcPortName.String(), endpointsMap[svcPortName])
|
||||||
}{
|
}
|
||||||
{
|
if endpointsMap[svcPortName][0].IP() != "10.240.0.4" {
|
||||||
description: "DeletedUDPClusterIPs clears entries for given clusterIPs (only)",
|
t.Fatalf("expected endpointsMap[%q][0] IP to be \"10.240.0.4\", got \"%s\"", svcPortName.String(), endpointsMap[svcPortName][0].IP())
|
||||||
|
}
|
||||||
serviceUpdates: proxy.UpdateServiceMapResult{
|
if endpointsMap[svcPortName][1].IP() != "10.240.1.5" {
|
||||||
// Note: this isn't testClusterIP; it's the IP of some
|
t.Fatalf("expected endpointsMap[%q][1] IP to be \"10.240.1.5\", got \"%s\"", svcPortName.String(), endpointsMap[svcPortName][1].IP())
|
||||||
// unknown (because deleted) service.
|
}
|
||||||
DeletedUDPClusterIPs: sets.New("172.30.99.99"),
|
if !endpointsMap[svcPortName][0].IsServing() {
|
||||||
},
|
t.Fatalf("expected endpointsMap[%q][0] to be serving", svcPortName.String())
|
||||||
endpointsUpdates: proxy.UpdateEndpointsMapResult{},
|
}
|
||||||
|
if endpointsMap[svcPortName][1].IsServing() {
|
||||||
result: FakeInterface{
|
t.Fatalf("expected endpointsMap[%q][1] to be not serving", svcPortName.String())
|
||||||
ClearedIPs: sets.New("172.30.99.99"),
|
}
|
||||||
|
}
|
||||||
ClearedPorts: sets.New[int](),
|
|
||||||
ClearedNATs: map[string]string{},
|
// mock existing entries before cleanup
|
||||||
ClearedPortNATs: map[int]string{},
|
// we create 36 fake flow entries ( 3 Endpoints * 3 Protocols * ( 3 (ServiceIPs) + 1 (NodePort))
|
||||||
},
|
var mockEntries []*netlink.ConntrackFlow
|
||||||
},
|
// expectedEntries are the entries on which we will assert the cleanup logic
|
||||||
{
|
var expectedEntries []*netlink.ConntrackFlow
|
||||||
description: "DeletedUDPEndpoints clears NAT entries for all IPs and NodePorts",
|
for _, dnatDest := range []string{testServingEndpointIP, testNonServingEndpointIP, testDeletedEndpointIP} {
|
||||||
|
for _, proto := range []uint8{unix.IPPROTO_TCP, unix.IPPROTO_UDP, unix.IPPROTO_SCTP} {
|
||||||
serviceUpdates: proxy.UpdateServiceMapResult{
|
for _, origDest := range []string{testClusterIP, testLoadBalancerIP, testExternalIP} {
|
||||||
DeletedUDPClusterIPs: sets.New[string](),
|
entry := &netlink.ConntrackFlow{
|
||||||
},
|
FamilyType: unix.AF_INET,
|
||||||
endpointsUpdates: proxy.UpdateEndpointsMapResult{
|
Forward: netlink.IPTuple{
|
||||||
DeletedUDPEndpoints: []proxy.ServiceEndpoint{{
|
DstIP: netutils.ParseIPSloppy(origDest),
|
||||||
Endpoint: net.JoinHostPort(testEndpointIP, testEndpointPort),
|
Protocol: proto,
|
||||||
ServicePortName: udpPortName,
|
},
|
||||||
}},
|
Reverse: netlink.IPTuple{
|
||||||
},
|
Protocol: proto,
|
||||||
|
SrcIP: netutils.ParseIPSloppy(dnatDest),
|
||||||
result: FakeInterface{
|
},
|
||||||
ClearedIPs: sets.New[string](),
|
}
|
||||||
ClearedPorts: sets.New[int](),
|
mockEntries = append(mockEntries, entry)
|
||||||
|
// we do not expect deleted or non-serving UDP endpoints flows to be present after cleanup
|
||||||
ClearedNATs: map[string]string{
|
if !(proto == unix.IPPROTO_UDP && (dnatDest == testNonServingEndpointIP || dnatDest == testDeletedEndpointIP)) {
|
||||||
testClusterIP: testEndpointIP,
|
expectedEntries = append(expectedEntries, entry)
|
||||||
testExternalIP: testEndpointIP,
|
}
|
||||||
testLoadBalancerIP: testEndpointIP,
|
}
|
||||||
},
|
entry := &netlink.ConntrackFlow{
|
||||||
ClearedPortNATs: map[int]string{
|
FamilyType: unix.AF_INET,
|
||||||
testNodePort: testEndpointIP,
|
Forward: netlink.IPTuple{
|
||||||
},
|
DstPort: testNodePort,
|
||||||
},
|
Protocol: proto,
|
||||||
},
|
},
|
||||||
{
|
Reverse: netlink.IPTuple{
|
||||||
description: "NewlyActiveUDPServices clears entries for all IPs and NodePorts",
|
Protocol: proto,
|
||||||
|
SrcIP: netutils.ParseIPSloppy(dnatDest),
|
||||||
serviceUpdates: proxy.UpdateServiceMapResult{
|
},
|
||||||
DeletedUDPClusterIPs: sets.New[string](),
|
}
|
||||||
},
|
mockEntries = append(mockEntries, entry)
|
||||||
endpointsUpdates: proxy.UpdateEndpointsMapResult{
|
// we do not expect deleted or non-serving UDP endpoints entries to be present after cleanup
|
||||||
DeletedUDPEndpoints: []proxy.ServiceEndpoint{},
|
if !(proto == unix.IPPROTO_UDP && (dnatDest == testNonServingEndpointIP || dnatDest == testDeletedEndpointIP)) {
|
||||||
NewlyActiveUDPServices: []proxy.ServicePortName{
|
expectedEntries = append(expectedEntries, entry)
|
||||||
udpPortName,
|
}
|
||||||
},
|
}
|
||||||
},
|
}
|
||||||
|
|
||||||
result: FakeInterface{
|
// add some non-DNATed mock entries which should be cleared up by reconciler
|
||||||
ClearedIPs: sets.New(testClusterIP, testExternalIP, testLoadBalancerIP),
|
// These will exist if the proxy don't have DROP/REJECT rule for service with
|
||||||
ClearedPorts: sets.New(testNodePort),
|
// no endpoints, --orig-dst and --reply-src will be same for these entries.
|
||||||
|
for _, ip := range []string{testClusterIP, testLoadBalancerIP, testExternalIP} {
|
||||||
ClearedNATs: map[string]string{},
|
entry := &netlink.ConntrackFlow{
|
||||||
ClearedPortNATs: map[int]string{},
|
FamilyType: unix.AF_INET,
|
||||||
},
|
Forward: netlink.IPTuple{
|
||||||
},
|
DstIP: netutils.ParseIPSloppy(ip),
|
||||||
|
Protocol: unix.IPPROTO_UDP,
|
||||||
{
|
},
|
||||||
description: "DeletedUDPEndpoints for unknown Service has no effect",
|
Reverse: netlink.IPTuple{
|
||||||
|
Protocol: unix.IPPROTO_UDP,
|
||||||
serviceUpdates: proxy.UpdateServiceMapResult{
|
SrcIP: netutils.ParseIPSloppy(ip),
|
||||||
DeletedUDPClusterIPs: sets.New[string](),
|
},
|
||||||
},
|
}
|
||||||
endpointsUpdates: proxy.UpdateEndpointsMapResult{
|
mockEntries = append(mockEntries, entry)
|
||||||
DeletedUDPEndpoints: []proxy.ServiceEndpoint{{
|
|
||||||
Endpoint: "10.240.0.4:80",
|
|
||||||
ServicePortName: unknownPortName,
|
|
||||||
}},
|
|
||||||
NewlyActiveUDPServices: []proxy.ServicePortName{},
|
|
||||||
},
|
|
||||||
|
|
||||||
result: FakeInterface{
|
|
||||||
ClearedIPs: sets.New[string](),
|
|
||||||
ClearedPorts: sets.New[int](),
|
|
||||||
ClearedNATs: map[string]string{},
|
|
||||||
ClearedPortNATs: map[int]string{},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
description: "NewlyActiveUDPServices for unknown Service has no effect",
|
|
||||||
|
|
||||||
serviceUpdates: proxy.UpdateServiceMapResult{
|
|
||||||
DeletedUDPClusterIPs: sets.New[string](),
|
|
||||||
},
|
|
||||||
endpointsUpdates: proxy.UpdateEndpointsMapResult{
|
|
||||||
DeletedUDPEndpoints: []proxy.ServiceEndpoint{},
|
|
||||||
NewlyActiveUDPServices: []proxy.ServicePortName{
|
|
||||||
unknownPortName,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
|
|
||||||
result: FakeInterface{
|
|
||||||
ClearedIPs: sets.New[string](),
|
|
||||||
ClearedPorts: sets.New[int](),
|
|
||||||
ClearedNATs: map[string]string{},
|
|
||||||
ClearedPortNATs: map[int]string{},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, tc := range testCases {
|
|
||||||
t.Run(tc.description, func(t *testing.T) {
|
|
||||||
fake := NewFake()
|
fake := NewFake()
|
||||||
CleanStaleEntries(fake, testIPFamily, svcPortMap, tc.serviceUpdates, tc.endpointsUpdates)
|
fake.entries = mockEntries
|
||||||
if !fake.ClearedIPs.Equal(tc.result.ClearedIPs) {
|
CleanStaleEntries(fake, testIPFamily, svcPortMap, endpointsMap)
|
||||||
t.Errorf("Expected ClearedIPs=%v, got %v", tc.result.ClearedIPs, fake.ClearedIPs)
|
|
||||||
}
|
actualEntries, _ := fake.ListEntries(ipFamilyMap[testIPFamily])
|
||||||
if !fake.ClearedPorts.Equal(tc.result.ClearedPorts) {
|
require.Equal(t, len(expectedEntries), len(actualEntries))
|
||||||
t.Errorf("Expected ClearedPorts=%v, got %v", tc.result.ClearedPorts, fake.ClearedPorts)
|
|
||||||
}
|
// sort the actual flows before comparison
|
||||||
if !reflect.DeepEqual(fake.ClearedNATs, tc.result.ClearedNATs) {
|
sort.Slice(actualEntries, func(i, j int) bool {
|
||||||
t.Errorf("Expected ClearedNATs=%v, got %v", tc.result.ClearedNATs, fake.ClearedNATs)
|
return actualEntries[i].String() < actualEntries[j].String()
|
||||||
}
|
|
||||||
if !reflect.DeepEqual(fake.ClearedPortNATs, tc.result.ClearedPortNATs) {
|
|
||||||
t.Errorf("Expected ClearedPortNATs=%v, got %v", tc.result.ClearedPortNATs, fake.ClearedPortNATs)
|
|
||||||
}
|
|
||||||
})
|
})
|
||||||
}
|
// sort the expected flows before comparison
|
||||||
}
|
sort.Slice(expectedEntries, func(i, j int) bool {
|
||||||
|
return expectedEntries[i].String() < expectedEntries[j].String()
|
||||||
func TestFilterForIP(t *testing.T) {
|
|
||||||
testCases := []struct {
|
|
||||||
name string
|
|
||||||
ip string
|
|
||||||
protocol v1.Protocol
|
|
||||||
expectedFamily netlink.InetFamily
|
|
||||||
expectedFilter *conntrackFilter
|
|
||||||
}{
|
|
||||||
{
|
|
||||||
name: "ipv4 + UDP",
|
|
||||||
ip: "10.96.0.10",
|
|
||||||
protocol: v1.ProtocolUDP,
|
|
||||||
expectedFilter: &conntrackFilter{
|
|
||||||
protocol: 17,
|
|
||||||
original: &connectionTuple{dstIP: netutils.ParseIPSloppy("10.96.0.10")},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "ipv6 + TCP",
|
|
||||||
ip: "2001:db8:1::2",
|
|
||||||
protocol: v1.ProtocolTCP,
|
|
||||||
expectedFilter: &conntrackFilter{
|
|
||||||
protocol: 6,
|
|
||||||
original: &connectionTuple{dstIP: netutils.ParseIPSloppy("2001:db8:1::2")},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, tc := range testCases {
|
|
||||||
t.Run(tc.name, func(t *testing.T) {
|
|
||||||
require.Equal(t, tc.expectedFilter, filterForIP(tc.ip, tc.protocol))
|
|
||||||
})
|
})
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestFilterForPort(t *testing.T) {
|
for i := 0; i < len(expectedEntries); i++ {
|
||||||
testCases := []struct {
|
require.Equal(t, expectedEntries[i], actualEntries[i])
|
||||||
name string
|
|
||||||
port int
|
|
||||||
protocol v1.Protocol
|
|
||||||
expectedFilter *conntrackFilter
|
|
||||||
}{
|
|
||||||
{
|
|
||||||
name: "UDP",
|
|
||||||
port: 5000,
|
|
||||||
protocol: v1.ProtocolUDP,
|
|
||||||
|
|
||||||
expectedFilter: &conntrackFilter{
|
|
||||||
protocol: 17,
|
|
||||||
original: &connectionTuple{dstPort: 5000},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "SCTP",
|
|
||||||
port: 3000,
|
|
||||||
protocol: v1.ProtocolSCTP,
|
|
||||||
expectedFilter: &conntrackFilter{
|
|
||||||
protocol: 132,
|
|
||||||
original: &connectionTuple{dstPort: 3000},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, tc := range testCases {
|
|
||||||
t.Run(tc.name, func(t *testing.T) {
|
|
||||||
require.Equal(t, tc.expectedFilter, filterForPort(tc.port, tc.protocol))
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -29,6 +29,7 @@ import (
|
|||||||
|
|
||||||
// Interface for dealing with conntrack
|
// Interface for dealing with conntrack
|
||||||
type Interface interface {
|
type Interface interface {
|
||||||
|
ListEntries(ipFamily uint8) ([]*netlink.ConntrackFlow, error)
|
||||||
// ClearEntries deletes conntrack entries for connections of the given IP family,
|
// ClearEntries deletes conntrack entries for connections of the given IP family,
|
||||||
// filtered by the given filters.
|
// filtered by the given filters.
|
||||||
ClearEntries(ipFamily uint8, filters ...netlink.CustomConntrackFilter) (int, error)
|
ClearEntries(ipFamily uint8, filters ...netlink.CustomConntrackFilter) (int, error)
|
||||||
@ -36,6 +37,7 @@ type Interface interface {
|
|||||||
|
|
||||||
// netlinkHandler allows consuming real and mockable implementation for testing.
|
// netlinkHandler allows consuming real and mockable implementation for testing.
|
||||||
type netlinkHandler interface {
|
type netlinkHandler interface {
|
||||||
|
ConntrackTableList(netlink.ConntrackTableType, netlink.InetFamily) ([]*netlink.ConntrackFlow, error)
|
||||||
ConntrackDeleteFilters(netlink.ConntrackTableType, netlink.InetFamily, ...netlink.CustomConntrackFilter) (uint, error)
|
ConntrackDeleteFilters(netlink.ConntrackTableType, netlink.InetFamily, ...netlink.CustomConntrackFilter) (uint, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -54,6 +56,11 @@ func newConntracker(handler netlinkHandler) Interface {
|
|||||||
return &conntracker{handler: handler}
|
return &conntracker{handler: handler}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ListEntries list all conntrack entries for connections of the given IP family.
|
||||||
|
func (ct *conntracker) ListEntries(ipFamily uint8) ([]*netlink.ConntrackFlow, error) {
|
||||||
|
return ct.handler.ConntrackTableList(netlink.ConntrackTable, netlink.InetFamily(ipFamily))
|
||||||
|
}
|
||||||
|
|
||||||
// ClearEntries deletes conntrack entries for connections of the given IP family,
|
// ClearEntries deletes conntrack entries for connections of the given IP family,
|
||||||
// filtered by the given filters.
|
// filtered by the given filters.
|
||||||
func (ct *conntracker) ClearEntries(ipFamily uint8, filters ...netlink.CustomConntrackFilter) (int, error) {
|
func (ct *conntracker) ClearEntries(ipFamily uint8, filters ...netlink.CustomConntrackFilter) (int, error) {
|
||||||
@ -64,8 +71,7 @@ func (ct *conntracker) ClearEntries(ipFamily uint8, filters ...netlink.CustomCon
|
|||||||
|
|
||||||
n, err := ct.handler.ConntrackDeleteFilters(netlink.ConntrackTable, netlink.InetFamily(ipFamily), filters...)
|
n, err := ct.handler.ConntrackDeleteFilters(netlink.ConntrackTable, netlink.InetFamily(ipFamily), filters...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, fmt.Errorf("error deleting conntrack entries, error: %w", err)
|
return int(n), fmt.Errorf("error deleting conntrack entries, error: %w", err)
|
||||||
}
|
}
|
||||||
klog.V(4).InfoS("Cleared conntrack entries", "count", n)
|
|
||||||
return int(n), nil
|
return int(n), nil
|
||||||
}
|
}
|
||||||
|
@ -35,6 +35,10 @@ type fakeHandler struct {
|
|||||||
filters []*conntrackFilter
|
filters []*conntrackFilter
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (f *fakeHandler) ConntrackTableList(_ netlink.ConntrackTableType, _ netlink.InetFamily) ([]*netlink.ConntrackFlow, error) {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (f *fakeHandler) ConntrackDeleteFilters(tableType netlink.ConntrackTableType, family netlink.InetFamily, netlinkFilters ...netlink.CustomConntrackFilter) (uint, error) {
|
func (f *fakeHandler) ConntrackDeleteFilters(tableType netlink.ConntrackTableType, family netlink.InetFamily, netlinkFilters ...netlink.CustomConntrackFilter) (uint, error) {
|
||||||
f.tableType = tableType
|
f.tableType = tableType
|
||||||
f.ipFamily = family
|
f.ipFamily = family
|
||||||
@ -48,7 +52,6 @@ func (f *fakeHandler) ConntrackDeleteFilters(tableType netlink.ConntrackTableTyp
|
|||||||
var _ netlinkHandler = (*fakeHandler)(nil)
|
var _ netlinkHandler = (*fakeHandler)(nil)
|
||||||
|
|
||||||
func TestConntracker_ClearEntries(t *testing.T) {
|
func TestConntracker_ClearEntries(t *testing.T) {
|
||||||
|
|
||||||
testCases := []struct {
|
testCases := []struct {
|
||||||
name string
|
name string
|
||||||
ipFamily uint8
|
ipFamily uint8
|
||||||
|
@ -20,77 +20,42 @@ limitations under the License.
|
|||||||
package conntrack
|
package conntrack
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
|
||||||
|
|
||||||
"github.com/vishvananda/netlink"
|
"github.com/vishvananda/netlink"
|
||||||
|
|
||||||
v1 "k8s.io/api/core/v1"
|
|
||||||
"k8s.io/apimachinery/pkg/util/sets"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// FakeInterface implements Interface by just recording entries that have been cleared.
|
// FakeInterface implements Interface by just recording entries that have been cleared.
|
||||||
type FakeInterface struct {
|
type FakeInterface struct {
|
||||||
ClearedIPs sets.Set[string]
|
entries []*netlink.ConntrackFlow
|
||||||
ClearedPorts sets.Set[int]
|
|
||||||
ClearedNATs map[string]string // origin -> dest
|
|
||||||
ClearedPortNATs map[int]string // port -> dest
|
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ Interface = &FakeInterface{}
|
var _ Interface = &FakeInterface{}
|
||||||
|
|
||||||
// NewFake creates a new FakeInterface
|
// NewFake creates a new FakeInterface
|
||||||
func NewFake() *FakeInterface {
|
func NewFake() *FakeInterface {
|
||||||
fake := &FakeInterface{}
|
return &FakeInterface{entries: make([]*netlink.ConntrackFlow, 0)}
|
||||||
fake.Reset()
|
|
||||||
return fake
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Reset clears fake's sets/maps
|
// ListEntries is part of Interface
|
||||||
func (fake *FakeInterface) Reset() {
|
func (fake *FakeInterface) ListEntries(_ uint8) ([]*netlink.ConntrackFlow, error) {
|
||||||
fake.ClearedIPs = sets.New[string]()
|
return fake.entries, nil
|
||||||
fake.ClearedPorts = sets.New[int]()
|
|
||||||
fake.ClearedNATs = make(map[string]string)
|
|
||||||
fake.ClearedPortNATs = make(map[int]string)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// ClearEntries is part of Interface
|
// ClearEntries is part of Interface
|
||||||
func (fake *FakeInterface) ClearEntries(_ uint8, filters ...netlink.CustomConntrackFilter) (int, error) {
|
func (fake *FakeInterface) ClearEntries(_ uint8, filters ...netlink.CustomConntrackFilter) (int, error) {
|
||||||
for _, anyFilter := range filters {
|
var flows []*netlink.ConntrackFlow
|
||||||
filter := anyFilter.(*conntrackFilter)
|
before := len(fake.entries)
|
||||||
if filter.protocol != protocolMap[v1.ProtocolUDP] {
|
for _, flow := range fake.entries {
|
||||||
return 0, fmt.Errorf("FakeInterface currently only supports UDP")
|
var matched bool
|
||||||
}
|
for _, filter := range filters {
|
||||||
|
matched = filter.MatchConntrackFlow(flow)
|
||||||
// record IP and Port entries
|
if matched {
|
||||||
if filter.original != nil && filter.reply == nil {
|
break
|
||||||
if filter.original.dstIP != nil {
|
|
||||||
fake.ClearedIPs.Insert(filter.original.dstIP.String())
|
|
||||||
}
|
|
||||||
if filter.original.dstPort != 0 {
|
|
||||||
fake.ClearedPorts.Insert(int(filter.original.dstPort))
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if !matched {
|
||||||
// record NAT and NATPort entries
|
flows = append(flows, flow)
|
||||||
if filter.original != nil && filter.reply != nil {
|
|
||||||
if filter.original.dstIP != nil && filter.reply.srcIP != nil {
|
|
||||||
origin := filter.original.dstIP.String()
|
|
||||||
dest := filter.reply.srcIP.String()
|
|
||||||
if previous, exists := fake.ClearedNATs[origin]; exists && previous != dest {
|
|
||||||
return 0, fmt.Errorf("filter for NAT passed with same origin (%s), different destination (%s / %s)", origin, previous, dest)
|
|
||||||
}
|
|
||||||
fake.ClearedNATs[filter.original.dstIP.String()] = filter.reply.srcIP.String()
|
|
||||||
}
|
|
||||||
|
|
||||||
if filter.original.dstPort != 0 && filter.reply.srcIP != nil {
|
|
||||||
dest := filter.reply.srcIP.String()
|
|
||||||
port := int(filter.original.dstPort)
|
|
||||||
if previous, exists := fake.ClearedPortNATs[port]; exists && previous != dest {
|
|
||||||
return 0, fmt.Errorf("filter for PortNAT passed with same port (%d), different destination (%s / %s)", port, previous, dest)
|
|
||||||
}
|
|
||||||
fake.ClearedPortNATs[port] = dest
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
fake.entries = flows
|
||||||
return 0, nil
|
return before - len(fake.entries), nil
|
||||||
}
|
}
|
||||||
|
@ -1595,7 +1595,7 @@ func (proxier *Proxier) syncProxyRules() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Finish housekeeping, clear stale conntrack entries for UDP Services
|
// Finish housekeeping, clear stale conntrack entries for UDP Services
|
||||||
conntrack.CleanStaleEntries(proxier.conntrack, proxier.ipFamily, proxier.svcPortMap, serviceUpdateResult, endpointUpdateResult)
|
conntrack.CleanStaleEntries(proxier.conntrack, proxier.ipFamily, proxier.svcPortMap, proxier.endpointsMap)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (proxier *Proxier) writeServiceToEndpointRules(natRules proxyutil.LineBuffer, svcPortNameString string, svcInfo proxy.ServicePort, svcChain utiliptables.Chain, endpoints []proxy.Endpoint, args []string) {
|
func (proxier *Proxier) writeServiceToEndpointRules(natRules proxyutil.LineBuffer, svcPortNameString string, svcInfo proxy.ServicePort, svcChain utiliptables.Chain, endpoints []proxy.Endpoint, args []string) {
|
||||||
|
@ -932,7 +932,7 @@ func (proxier *Proxier) syncProxyRules() {
|
|||||||
// We assume that if this was called, we really want to sync them,
|
// We assume that if this was called, we really want to sync them,
|
||||||
// even if nothing changed in the meantime. In other words, callers are
|
// even if nothing changed in the meantime. In other words, callers are
|
||||||
// responsible for detecting no-op changes and not calling this function.
|
// responsible for detecting no-op changes and not calling this function.
|
||||||
serviceUpdateResult := proxier.svcPortMap.Update(proxier.serviceChanges)
|
_ = proxier.svcPortMap.Update(proxier.serviceChanges)
|
||||||
endpointUpdateResult := proxier.endpointsMap.Update(proxier.endpointsChanges)
|
endpointUpdateResult := proxier.endpointsMap.Update(proxier.endpointsChanges)
|
||||||
|
|
||||||
proxier.logger.V(3).Info("Syncing ipvs proxier rules")
|
proxier.logger.V(3).Info("Syncing ipvs proxier rules")
|
||||||
@ -1498,7 +1498,7 @@ func (proxier *Proxier) syncProxyRules() {
|
|||||||
metrics.SyncProxyRulesNoLocalEndpointsTotal.WithLabelValues("external").Set(float64(proxier.serviceNoLocalEndpointsExternal.Len()))
|
metrics.SyncProxyRulesNoLocalEndpointsTotal.WithLabelValues("external").Set(float64(proxier.serviceNoLocalEndpointsExternal.Len()))
|
||||||
|
|
||||||
// Finish housekeeping, clear stale conntrack entries for UDP Services
|
// Finish housekeeping, clear stale conntrack entries for UDP Services
|
||||||
conntrack.CleanStaleEntries(proxier.conntrack, proxier.ipFamily, proxier.svcPortMap, serviceUpdateResult, endpointUpdateResult)
|
conntrack.CleanStaleEntries(proxier.conntrack, proxier.ipFamily, proxier.svcPortMap, proxier.endpointsMap)
|
||||||
}
|
}
|
||||||
|
|
||||||
// writeIptablesRules write all iptables rules to proxier.natRules or proxier.FilterRules that ipvs proxier needed
|
// writeIptablesRules write all iptables rules to proxier.natRules or proxier.FilterRules that ipvs proxier needed
|
||||||
|
@ -1839,7 +1839,7 @@ func (proxier *Proxier) syncProxyRules() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Finish housekeeping, clear stale conntrack entries for UDP Services
|
// Finish housekeeping, clear stale conntrack entries for UDP Services
|
||||||
conntrack.CleanStaleEntries(proxier.conntrack, proxier.ipFamily, proxier.svcPortMap, serviceUpdateResult, endpointUpdateResult)
|
conntrack.CleanStaleEntries(proxier.conntrack, proxier.ipFamily, proxier.svcPortMap, proxier.endpointsMap)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (proxier *Proxier) writeServiceToEndpointRules(tx *knftables.Transaction, svcInfo *servicePortInfo, svcChain string, endpoints []proxy.Endpoint) {
|
func (proxier *Proxier) writeServiceToEndpointRules(tx *knftables.Transaction, svcInfo *servicePortInfo, svcChain string, endpoints []proxy.Endpoint) {
|
||||||
|
Loading…
Reference in New Issue
Block a user