diff --git a/pkg/proxy/conntrack/cleanup.go b/pkg/proxy/conntrack/cleanup.go index 6e5046f0c0b..cd39c66cb0e 100644 --- a/pkg/proxy/conntrack/cleanup.go +++ b/pkg/proxy/conntrack/cleanup.go @@ -31,16 +31,21 @@ import ( // CleanStaleEntries takes care of flushing stale conntrack entries for services and endpoints. func CleanStaleEntries(isIPv6 bool, exec utilexec.Interface, svcPortMap proxy.ServicePortMap, serviceUpdateResult proxy.UpdateServiceMapResult, endpointsUpdateResult proxy.UpdateEndpointsMapResult) { + ct := &execCT{exec} + cleanStaleEntries(ct, isIPv6, svcPortMap, serviceUpdateResult, endpointsUpdateResult) +} - deleteStaleServiceConntrackEntries(isIPv6, exec, svcPortMap, serviceUpdateResult, endpointsUpdateResult) - deleteStaleEndpointConntrackEntries(exec, svcPortMap, endpointsUpdateResult) +func cleanStaleEntries(ct Interface, isIPv6 bool, svcPortMap proxy.ServicePortMap, + serviceUpdateResult proxy.UpdateServiceMapResult, endpointsUpdateResult proxy.UpdateEndpointsMapResult) { + deleteStaleServiceConntrackEntries(ct, isIPv6, svcPortMap, serviceUpdateResult, endpointsUpdateResult) + deleteStaleEndpointConntrackEntries(ct, svcPortMap, endpointsUpdateResult) } // deleteStaleServiceConntrackEntries takes care of flushing stale conntrack entries related // to UDP Service IPs. When a service has no endpoints and we drop traffic to it, conntrack // may create "black hole" entries for that IP+port. When the service gets endpoints we // need to delete those entries so further traffic doesn't get dropped. -func deleteStaleServiceConntrackEntries(isIPv6 bool, exec utilexec.Interface, svcPortMap proxy.ServicePortMap, serviceUpdateResult proxy.UpdateServiceMapResult, endpointsUpdateResult proxy.UpdateEndpointsMapResult) { +func deleteStaleServiceConntrackEntries(ct Interface, isIPv6 bool, svcPortMap proxy.ServicePortMap, serviceUpdateResult proxy.UpdateServiceMapResult, endpointsUpdateResult proxy.UpdateEndpointsMapResult) { conntrackCleanupServiceIPs := serviceUpdateResult.DeletedUDPClusterIPs conntrackCleanupServiceNodePorts := sets.New[int]() @@ -65,13 +70,13 @@ func deleteStaleServiceConntrackEntries(isIPv6 bool, exec utilexec.Interface, sv klog.V(4).InfoS("Deleting conntrack stale entries for services", "IPs", conntrackCleanupServiceIPs.UnsortedList()) for _, svcIP := range conntrackCleanupServiceIPs.UnsortedList() { - if err := ClearEntriesForIP(exec, svcIP, v1.ProtocolUDP); err != nil { + if err := ct.ClearEntriesForIP(svcIP, v1.ProtocolUDP); err != nil { klog.ErrorS(err, "Failed to delete stale service connections", "IP", svcIP) } } klog.V(4).InfoS("Deleting conntrack stale entries for services", "nodePorts", conntrackCleanupServiceNodePorts.UnsortedList()) for _, nodePort := range conntrackCleanupServiceNodePorts.UnsortedList() { - err := ClearEntriesForPort(exec, nodePort, isIPv6, v1.ProtocolUDP) + err := ct.ClearEntriesForPort(nodePort, isIPv6, v1.ProtocolUDP) if err != nil { klog.ErrorS(err, "Failed to clear udp conntrack", "nodePort", nodePort) } @@ -81,30 +86,30 @@ func deleteStaleServiceConntrackEntries(isIPv6 bool, exec utilexec.Interface, sv // deleteStaleEndpointConntrackEntries takes care of flushing stale conntrack entries related // to UDP endpoints. After a UDP endpoint is removed we must flush any conntrack entries // for it so that if the same client keeps sending, the packets will get routed to a new endpoint. -func deleteStaleEndpointConntrackEntries(exec utilexec.Interface, svcPortMap proxy.ServicePortMap, endpointsUpdateResult proxy.UpdateEndpointsMapResult) { +func deleteStaleEndpointConntrackEntries(ct Interface, svcPortMap proxy.ServicePortMap, endpointsUpdateResult proxy.UpdateEndpointsMapResult) { for _, epSvcPair := range endpointsUpdateResult.DeletedUDPEndpoints { if svcInfo, ok := svcPortMap[epSvcPair.ServicePortName]; ok { endpointIP := proxyutil.IPPart(epSvcPair.Endpoint) nodePort := svcInfo.NodePort() var err error if nodePort != 0 { - err = ClearEntriesForPortNAT(exec, endpointIP, nodePort, v1.ProtocolUDP) + err = ct.ClearEntriesForPortNAT(endpointIP, nodePort, v1.ProtocolUDP) if err != nil { klog.ErrorS(err, "Failed to delete nodeport-related endpoint connections", "servicePortName", epSvcPair.ServicePortName) } } - err = ClearEntriesForNAT(exec, svcInfo.ClusterIP().String(), endpointIP, v1.ProtocolUDP) + err = ct.ClearEntriesForNAT(svcInfo.ClusterIP().String(), endpointIP, v1.ProtocolUDP) if err != nil { klog.ErrorS(err, "Failed to delete endpoint connections", "servicePortName", epSvcPair.ServicePortName) } for _, extIP := range svcInfo.ExternalIPs() { - err := ClearEntriesForNAT(exec, extIP.String(), endpointIP, v1.ProtocolUDP) + err := ct.ClearEntriesForNAT(extIP.String(), endpointIP, v1.ProtocolUDP) if err != nil { klog.ErrorS(err, "Failed to delete endpoint connections for externalIP", "servicePortName", epSvcPair.ServicePortName, "externalIP", extIP) } } for _, lbIP := range svcInfo.LoadBalancerVIPs() { - err := ClearEntriesForNAT(exec, lbIP.String(), endpointIP, v1.ProtocolUDP) + err := ct.ClearEntriesForNAT(lbIP.String(), endpointIP, v1.ProtocolUDP) if err != nil { klog.ErrorS(err, "Failed to delete endpoint connections for LoadBalancerIP", "servicePortName", epSvcPair.ServicePortName, "loadBalancerIP", lbIP) } diff --git a/pkg/proxy/conntrack/cleanup_test.go b/pkg/proxy/conntrack/cleanup_test.go new file mode 100644 index 00000000000..1a440f18963 --- /dev/null +++ b/pkg/proxy/conntrack/cleanup_test.go @@ -0,0 +1,263 @@ +//go:build linux +// +build linux + +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package conntrack + +import ( + "net" + "reflect" + "testing" + + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/kubernetes/pkg/proxy" +) + +const ( + testClusterIP = "172.30.1.1" + testExternalIP = "192.168.99.100" + testLoadBalancerIP = "1.2.3.4" + + testEndpointIP = "10.240.0.4" + + testPort = 53 + testNodePort = 5353 + testEndpointPort = "5300" +) + +func TestCleanStaleEntries(t *testing.T) { + // We need to construct a proxy.ServicePortMap to pass to CleanStaleEntries. + // ServicePortMap is just map[string]proxy.ServicePort, but there are no public + // constructors for any implementation of proxy.ServicePort, so we have to either + // provide our own implementation of that interface, or else use a + // proxy.ServiceChangeTracker to construct them and fill in the map for us. + + sct := proxy.NewServiceChangeTracker(nil, v1.IPv4Protocol, nil, nil) + svc := &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "cleanup-test", + Namespace: "test", + }, + Spec: v1.ServiceSpec{ + ClusterIP: testClusterIP, + ExternalIPs: []string{testExternalIP}, + Ports: []v1.ServicePort{ + { + Name: "dns-tcp", + Port: testPort, + Protocol: v1.ProtocolTCP, + }, + { + Name: "dns-udp", + Port: testPort, + NodePort: testNodePort, + Protocol: v1.ProtocolUDP, + }, + }, + }, + Status: v1.ServiceStatus{ + LoadBalancer: v1.LoadBalancerStatus{ + Ingress: []v1.LoadBalancerIngress{{ + IP: testLoadBalancerIP, + }}, + }, + }, + } + sct.Update(nil, svc) + + svcPortMap := make(proxy.ServicePortMap) + _ = svcPortMap.Update(sct) + + // (At this point we are done with sct, and in particular, we don't use sct to + // construct UpdateServiceMapResults, because pkg/proxy already has its own tests + // for that. Also, svcPortMap is read-only from this point on.) + + tcpPortName := proxy.ServicePortName{ + NamespacedName: types.NamespacedName{ + Namespace: svc.Namespace, + Name: svc.Name, + }, + Port: svc.Spec.Ports[0].Name, + Protocol: svc.Spec.Ports[0].Protocol, + } + + udpPortName := proxy.ServicePortName{ + NamespacedName: types.NamespacedName{ + Namespace: svc.Namespace, + Name: svc.Name, + }, + Port: svc.Spec.Ports[1].Name, + Protocol: svc.Spec.Ports[1].Protocol, + } + + unknownPortName := udpPortName + unknownPortName.Namespace = "unknown" + + // Sanity-check to make sure we constructed the map correctly + if len(svcPortMap) != 2 { + t.Fatalf("expected svcPortMap to have 2 entries, got %+v", svcPortMap) + } + servicePort := svcPortMap[tcpPortName] + if servicePort == nil || servicePort.String() != "172.30.1.1:53/TCP" { + t.Fatalf("expected svcPortMap[%q] to be \"172.30.1.1:53/TCP\", got %q", tcpPortName.String(), servicePort.String()) + } + servicePort = svcPortMap[udpPortName] + if servicePort == nil || servicePort.String() != "172.30.1.1:53/UDP" { + t.Fatalf("expected svcPortMap[%q] to be \"172.30.1.1:53/UDP\", got %q", udpPortName.String(), servicePort.String()) + } + + testCases := []struct { + description string + + serviceUpdates proxy.UpdateServiceMapResult + endpointsUpdates proxy.UpdateEndpointsMapResult + + result FakeInterface + }{ + { + description: "DeletedUDPClusterIPs clears entries for given clusterIPs (only)", + + serviceUpdates: proxy.UpdateServiceMapResult{ + // Note: this isn't testClusterIP; it's the IP of some + // unknown (because deleted) service. + DeletedUDPClusterIPs: sets.New("172.30.99.99"), + }, + endpointsUpdates: proxy.UpdateEndpointsMapResult{}, + + result: FakeInterface{ + ClearedIPs: sets.New("172.30.99.99"), + + ClearedPorts: sets.New[int](), + ClearedNATs: map[string]string{}, + ClearedPortNATs: map[int]string{}, + }, + }, + { + description: "DeletedUDPEndpoints clears NAT entries for all IPs and NodePorts", + + serviceUpdates: proxy.UpdateServiceMapResult{ + DeletedUDPClusterIPs: sets.New[string](), + }, + endpointsUpdates: proxy.UpdateEndpointsMapResult{ + DeletedUDPEndpoints: []proxy.ServiceEndpoint{{ + Endpoint: net.JoinHostPort(testEndpointIP, testEndpointPort), + ServicePortName: udpPortName, + }}, + }, + + result: FakeInterface{ + ClearedIPs: sets.New[string](), + ClearedPorts: sets.New[int](), + + ClearedNATs: map[string]string{ + testClusterIP: testEndpointIP, + testExternalIP: testEndpointIP, + testLoadBalancerIP: testEndpointIP, + }, + ClearedPortNATs: map[int]string{ + testNodePort: testEndpointIP, + }, + }, + }, + { + description: "NewlyActiveUDPServices clears entries for all IPs and NodePorts", + + serviceUpdates: proxy.UpdateServiceMapResult{ + DeletedUDPClusterIPs: sets.New[string](), + }, + endpointsUpdates: proxy.UpdateEndpointsMapResult{ + DeletedUDPEndpoints: []proxy.ServiceEndpoint{}, + NewlyActiveUDPServices: []proxy.ServicePortName{ + udpPortName, + }, + }, + + result: FakeInterface{ + ClearedIPs: sets.New(testClusterIP, testExternalIP, testLoadBalancerIP), + ClearedPorts: sets.New(testNodePort), + + ClearedNATs: map[string]string{}, + ClearedPortNATs: map[int]string{}, + }, + }, + + { + description: "DeletedUDPEndpoints for unknown Service has no effect", + + serviceUpdates: proxy.UpdateServiceMapResult{ + DeletedUDPClusterIPs: sets.New[string](), + }, + endpointsUpdates: proxy.UpdateEndpointsMapResult{ + DeletedUDPEndpoints: []proxy.ServiceEndpoint{{ + Endpoint: "10.240.0.4:80", + ServicePortName: unknownPortName, + }}, + NewlyActiveUDPServices: []proxy.ServicePortName{}, + }, + + result: FakeInterface{ + ClearedIPs: sets.New[string](), + ClearedPorts: sets.New[int](), + ClearedNATs: map[string]string{}, + ClearedPortNATs: map[int]string{}, + }, + }, + { + description: "NewlyActiveUDPServices for unknown Service has no effect", + + serviceUpdates: proxy.UpdateServiceMapResult{ + DeletedUDPClusterIPs: sets.New[string](), + }, + endpointsUpdates: proxy.UpdateEndpointsMapResult{ + DeletedUDPEndpoints: []proxy.ServiceEndpoint{}, + NewlyActiveUDPServices: []proxy.ServicePortName{ + unknownPortName, + }, + }, + + result: FakeInterface{ + ClearedIPs: sets.New[string](), + ClearedPorts: sets.New[int](), + ClearedNATs: map[string]string{}, + ClearedPortNATs: map[int]string{}, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.description, func(t *testing.T) { + fake := NewFake() + cleanStaleEntries(fake, false, svcPortMap, tc.serviceUpdates, tc.endpointsUpdates) + if !fake.ClearedIPs.Equal(tc.result.ClearedIPs) { + t.Errorf("Expected ClearedIPs=%v, got %v", tc.result.ClearedIPs, fake.ClearedIPs) + } + if !fake.ClearedPorts.Equal(tc.result.ClearedPorts) { + t.Errorf("Expected ClearedPorts=%v, got %v", tc.result.ClearedPorts, fake.ClearedPorts) + } + if !reflect.DeepEqual(fake.ClearedNATs, tc.result.ClearedNATs) { + t.Errorf("Expected ClearedNATs=%v, got %v", tc.result.ClearedNATs, fake.ClearedNATs) + } + if !reflect.DeepEqual(fake.ClearedPortNATs, tc.result.ClearedPortNATs) { + t.Errorf("Expected ClearedPortNATs=%v, got %v", tc.result.ClearedPortNATs, fake.ClearedPortNATs) + } + }) + } +} diff --git a/pkg/proxy/conntrack/conntrack.go b/pkg/proxy/conntrack/conntrack.go index 3a1925b8675..dbc6cdab3d8 100644 --- a/pkg/proxy/conntrack/conntrack.go +++ b/pkg/proxy/conntrack/conntrack.go @@ -30,7 +30,31 @@ import ( utilnet "k8s.io/utils/net" ) -// Utilities for dealing with conntrack +// Interface for dealing with conntrack +type Interface interface { + // ClearEntriesForIP deletes conntrack entries for connections of the given + // protocol, to the given IP. + ClearEntriesForIP(ip string, protocol v1.Protocol) error + + // ClearEntriesForPort deletes conntrack entries for connections of the given + // protocol and IP family, to the given port. + ClearEntriesForPort(port int, isIPv6 bool, protocol v1.Protocol) error + + // ClearEntriesForNAT deletes conntrack entries for connections of the given + // protocol, which had been DNATted from origin to dest. + ClearEntriesForNAT(origin, dest string, protocol v1.Protocol) error + + // ClearEntriesForPortNAT deletes conntrack entries for connections of the given + // protocol, which had been DNATted from the given port (on any IP) to dest. + ClearEntriesForPortNAT(dest string, port int, protocol v1.Protocol) error +} + +// execCT implements Interface by execing the conntrack tool +type execCT struct { + execer exec.Interface +} + +var _ Interface = &execCT{} // NoConnectionToDelete is the error string returned by conntrack when no matching connections are found const NoConnectionToDelete = "0 flow entries have been deleted" @@ -46,11 +70,25 @@ func parametersWithFamily(isIPv6 bool, parameters ...string) []string { return parameters } -// ClearEntriesForIP uses the conntrack tool to delete the conntrack entries -// for the UDP connections specified by the given service IP -func ClearEntriesForIP(execer exec.Interface, ip string, protocol v1.Protocol) error { +// exec executes the conntrack tool using the given parameters +func (ct *execCT) exec(parameters ...string) error { + conntrackPath, err := ct.execer.LookPath("conntrack") + if err != nil { + return fmt.Errorf("error looking for path of conntrack: %v", err) + } + klog.V(4).InfoS("Clearing conntrack entries", "parameters", parameters) + output, err := ct.execer.Command(conntrackPath, parameters...).CombinedOutput() + if err != nil { + return fmt.Errorf("conntrack command returned: %q, error message: %s", string(output), err) + } + klog.V(4).InfoS("Conntrack entries deleted", "output", string(output)) + return nil +} + +// ClearEntriesForIP is part of Interface +func (ct *execCT) ClearEntriesForIP(ip string, protocol v1.Protocol) error { parameters := parametersWithFamily(utilnet.IsIPv6String(ip), "-D", "--orig-dst", ip, "-p", protoStr(protocol)) - err := Exec(execer, parameters...) + err := ct.exec(parameters...) if err != nil && !strings.Contains(err.Error(), NoConnectionToDelete) { // TODO: Better handling for deletion failure. When failure occur, stale udp connection may not get flushed. // These stale udp connection will keep black hole traffic. Making this a best effort operation for now, since it @@ -60,45 +98,24 @@ func ClearEntriesForIP(execer exec.Interface, ip string, protocol v1.Protocol) e return nil } -// Exec executes the conntrack tool using the given parameters -func Exec(execer exec.Interface, parameters ...string) error { - conntrackPath, err := execer.LookPath("conntrack") - if err != nil { - return fmt.Errorf("error looking for path of conntrack: %v", err) - } - klog.V(4).InfoS("Clearing conntrack entries", "parameters", parameters) - output, err := execer.Command(conntrackPath, parameters...).CombinedOutput() - if err != nil { - return fmt.Errorf("conntrack command returned: %q, error message: %s", string(output), err) - } - klog.V(4).InfoS("Conntrack entries deleted", "output", string(output)) - return nil -} - -// ClearEntriesForPort uses the conntrack tool to delete the conntrack entries -// for connections specified by the port. -// When a packet arrives, it will not go through NAT table again, because it is not "the first" packet. -// The solution is clearing the conntrack. Known issues: -// https://github.com/docker/docker/issues/8795 -// https://github.com/kubernetes/kubernetes/issues/31983 -func ClearEntriesForPort(execer exec.Interface, port int, isIPv6 bool, protocol v1.Protocol) error { +// ClearEntriesForPort is part of Interface +func (ct *execCT) ClearEntriesForPort(port int, isIPv6 bool, protocol v1.Protocol) error { if port <= 0 { return fmt.Errorf("wrong port number. The port number must be greater than zero") } parameters := parametersWithFamily(isIPv6, "-D", "-p", protoStr(protocol), "--dport", strconv.Itoa(port)) - err := Exec(execer, parameters...) + err := ct.exec(parameters...) if err != nil && !strings.Contains(err.Error(), NoConnectionToDelete) { return fmt.Errorf("error deleting conntrack entries for UDP port: %d, error: %v", port, err) } return nil } -// ClearEntriesForNAT uses the conntrack tool to delete the conntrack entries -// for connections specified by the {origin, dest} IP pair. -func ClearEntriesForNAT(execer exec.Interface, origin, dest string, protocol v1.Protocol) error { +// ClearEntriesForNAT is part of Interface +func (ct *execCT) ClearEntriesForNAT(origin, dest string, protocol v1.Protocol) error { parameters := parametersWithFamily(utilnet.IsIPv6String(origin), "-D", "--orig-dst", origin, "--dst-nat", dest, "-p", protoStr(protocol)) - err := Exec(execer, parameters...) + err := ct.exec(parameters...) if err != nil && !strings.Contains(err.Error(), NoConnectionToDelete) { // TODO: Better handling for deletion failure. When failure occur, stale udp connection may not get flushed. // These stale udp connection will keep black hole traffic. Making this a best effort operation for now, since it @@ -108,16 +125,13 @@ func ClearEntriesForNAT(execer exec.Interface, origin, dest string, protocol v1. return nil } -// ClearEntriesForPortNAT uses the conntrack tool to delete the conntrack entries -// for connections specified by the {dest IP, port} pair. -// Known issue: -// https://github.com/kubernetes/kubernetes/issues/59368 -func ClearEntriesForPortNAT(execer exec.Interface, dest string, port int, protocol v1.Protocol) error { +// ClearEntriesForPortNAT is part of Interface +func (ct *execCT) ClearEntriesForPortNAT(dest string, port int, protocol v1.Protocol) error { if port <= 0 { return fmt.Errorf("wrong port number. The port number must be greater than zero") } parameters := parametersWithFamily(utilnet.IsIPv6String(dest), "-D", "-p", protoStr(protocol), "--dport", strconv.Itoa(port), "--dst-nat", dest) - err := Exec(execer, parameters...) + err := ct.exec(parameters...) if err != nil && !strings.Contains(err.Error(), NoConnectionToDelete) { return fmt.Errorf("error deleting conntrack entries for UDP port: %d, error: %v", port, err) } diff --git a/pkg/proxy/conntrack/conntrack_test.go b/pkg/proxy/conntrack/conntrack_test.go index 6b8b725c648..1d9e3fd9f3f 100644 --- a/pkg/proxy/conntrack/conntrack_test.go +++ b/pkg/proxy/conntrack/conntrack_test.go @@ -34,7 +34,13 @@ var nothingToDelete = func() ([]byte, []byte, error) { return []byte(""), nil, fmt.Errorf("conntrack v1.4.2 (conntrack-tools): 0 flow entries have been deleted") } -func makeCT(result fakeexec.FakeAction) (*fakeexec.FakeExec, *fakeexec.FakeCmd) { +type testCT struct { + execCT + + fcmd *fakeexec.FakeCmd +} + +func makeCT(result fakeexec.FakeAction) *testCT { fcmd := &fakeexec.FakeCmd{ CombinedOutputScript: []fakeexec.FakeAction{result}, } @@ -45,18 +51,18 @@ func makeCT(result fakeexec.FakeAction) (*fakeexec.FakeExec, *fakeexec.FakeCmd) LookPathFunc: func(cmd string) (string, error) { return cmd, nil }, } - return fexec, fcmd + return &testCT{execCT{fexec}, fcmd} } -// Gets the command that fexec executed. (If it didn't execute any commands, this will +// Gets the command that ct executed. (If it didn't execute any commands, this will // return "".) -func getExecutedCommand(fexec *fakeexec.FakeExec, fcmd *fakeexec.FakeCmd) string { +func (ct *testCT) getExecutedCommand() string { // FakeExec panics if you try to run more commands than you set it up for. So the // only possibilities here are that we ran 1 command or we ran 0. - if fexec.CommandCalls != 1 { + if ct.execer.(*fakeexec.FakeExec).CommandCalls != 1 { return "" } - return strings.Join(fcmd.CombinedOutputLog[0], " ") + return strings.Join(ct.fcmd.CombinedOutputLog[0], " ") } func TestExec(t *testing.T) { @@ -78,8 +84,8 @@ func TestExec(t *testing.T) { } for _, tc := range testCases { - fexec, fcmd := makeCT(tc.result) - err := Exec(fexec, tc.args...) + ct := makeCT(tc.result) + err := ct.exec(tc.args...) if tc.expectErr { if err == nil { t.Errorf("expected err, got %v", err) @@ -90,7 +96,7 @@ func TestExec(t *testing.T) { } } - execCmd := getExecutedCommand(fexec, fcmd) + execCmd := ct.getExecutedCommand() expectCmd := "conntrack " + strings.Join(tc.args, " ") if execCmd != expectCmd { t.Errorf("expect execute command: %s, but got: %s", expectCmd, execCmd) @@ -120,17 +126,17 @@ func TestClearEntriesForIP(t *testing.T) { } for _, tc := range testCases { - fexec, fcmd := makeCT(success) - if err := ClearEntriesForIP(fexec, tc.ip, v1.ProtocolUDP); err != nil { + ct := makeCT(success) + if err := ct.ClearEntriesForIP(tc.ip, v1.ProtocolUDP); err != nil { t.Errorf("%s/success: Unexpected error: %v", tc.name, err) } - execCommand := getExecutedCommand(fexec, fcmd) + execCommand := ct.getExecutedCommand() if tc.expectCommand != execCommand { t.Errorf("%s/success: Expect command: %s, but executed %s", tc.name, tc.expectCommand, execCommand) } - fexec, _ = makeCT(nothingToDelete) - if err := ClearEntriesForIP(fexec, tc.ip, v1.ProtocolUDP); err != nil { + ct = makeCT(nothingToDelete) + if err := ct.ClearEntriesForIP(tc.ip, v1.ProtocolUDP); err != nil { t.Errorf("%s/nothing to delete: Unexpected error: %v", tc.name, err) } } @@ -161,18 +167,18 @@ func TestClearEntriesForPort(t *testing.T) { } for _, tc := range testCases { - fexec, fcmd := makeCT(success) - err := ClearEntriesForPort(fexec, tc.port, tc.isIPv6, v1.ProtocolUDP) + ct := makeCT(success) + err := ct.ClearEntriesForPort(tc.port, tc.isIPv6, v1.ProtocolUDP) if err != nil { t.Errorf("%s/success: Unexpected error: %v", tc.name, err) } - execCommand := getExecutedCommand(fexec, fcmd) + execCommand := ct.getExecutedCommand() if tc.expectCommand != execCommand { t.Errorf("%s/success: Expect command: %s, but executed %s", tc.name, tc.expectCommand, execCommand) } - fexec, _ = makeCT(nothingToDelete) - err = ClearEntriesForPort(fexec, tc.port, tc.isIPv6, v1.ProtocolUDP) + ct = makeCT(nothingToDelete) + err = ct.ClearEntriesForPort(tc.port, tc.isIPv6, v1.ProtocolUDP) if err != nil { t.Errorf("%s/nothing to delete: Unexpected error: %v", tc.name, err) } @@ -204,18 +210,18 @@ func TestClearEntriesForNAT(t *testing.T) { } for _, tc := range testCases { - fexec, fcmd := makeCT(success) - err := ClearEntriesForNAT(fexec, tc.origin, tc.dest, v1.ProtocolUDP) + ct := makeCT(success) + err := ct.ClearEntriesForNAT(tc.origin, tc.dest, v1.ProtocolUDP) if err != nil { t.Errorf("%s/success: unexpected error: %v", tc.name, err) } - execCommand := getExecutedCommand(fexec, fcmd) + execCommand := ct.getExecutedCommand() if tc.expectCommand != execCommand { t.Errorf("%s/success: Expect command: %s, but executed %s", tc.name, tc.expectCommand, execCommand) } - fexec, _ = makeCT(nothingToDelete) - err = ClearEntriesForNAT(fexec, tc.origin, tc.dest, v1.ProtocolUDP) + ct = makeCT(nothingToDelete) + err = ct.ClearEntriesForNAT(tc.origin, tc.dest, v1.ProtocolUDP) if err != nil { t.Errorf("%s/nothing to delete: unexpected error: %v", tc.name, err) } @@ -247,18 +253,18 @@ func TestClearEntriesForPortNAT(t *testing.T) { } for _, tc := range testCases { - fexec, fcmd := makeCT(success) - err := ClearEntriesForPortNAT(fexec, tc.dest, tc.port, v1.ProtocolUDP) + ct := makeCT(success) + err := ct.ClearEntriesForPortNAT(tc.dest, tc.port, v1.ProtocolUDP) if err != nil { t.Errorf("%s/success: unexpected error: %v", tc.name, err) } - execCommand := getExecutedCommand(fexec, fcmd) + execCommand := ct.getExecutedCommand() if tc.expectCommand != execCommand { t.Errorf("%s/success: Expect command: %s, but executed %s", tc.name, tc.expectCommand, execCommand) } - fexec, _ = makeCT(nothingToDelete) - err = ClearEntriesForPortNAT(fexec, tc.dest, tc.port, v1.ProtocolUDP) + ct = makeCT(nothingToDelete) + err = ct.ClearEntriesForPortNAT(tc.dest, tc.port, v1.ProtocolUDP) if err != nil { t.Errorf("%s/nothing to delete: unexpected error: %v", tc.name, err) } diff --git a/pkg/proxy/conntrack/fake.go b/pkg/proxy/conntrack/fake.go new file mode 100644 index 00000000000..6eb6d11c679 --- /dev/null +++ b/pkg/proxy/conntrack/fake.go @@ -0,0 +1,95 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package conntrack + +import ( + "fmt" + + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/sets" +) + +// FakeInterface implements Interface by just recording entries that have been cleared. +type FakeInterface struct { + ClearedIPs sets.Set[string] + ClearedPorts sets.Set[int] + ClearedNATs map[string]string // origin -> dest + ClearedPortNATs map[int]string // port -> dest +} + +var _ Interface = &FakeInterface{} + +// NewFake creates a new FakeInterface +func NewFake() *FakeInterface { + fake := &FakeInterface{} + fake.Reset() + return fake +} + +// Reset clears fake's sets/maps +func (fake *FakeInterface) Reset() { + fake.ClearedIPs = sets.New[string]() + fake.ClearedPorts = sets.New[int]() + fake.ClearedNATs = make(map[string]string) + fake.ClearedPortNATs = make(map[int]string) +} + +// ClearEntriesForIP is part of Interface +func (fake *FakeInterface) ClearEntriesForIP(ip string, protocol v1.Protocol) error { + if protocol != v1.ProtocolUDP { + return fmt.Errorf("FakeInterface currently only supports UDP") + } + + fake.ClearedIPs.Insert(ip) + return nil +} + +// ClearEntriesForPort is part of Interface +func (fake *FakeInterface) ClearEntriesForPort(port int, isIPv6 bool, protocol v1.Protocol) error { + if protocol != v1.ProtocolUDP { + return fmt.Errorf("FakeInterface currently only supports UDP") + } + + fake.ClearedPorts.Insert(port) + return nil +} + +// ClearEntriesForNAT is part of Interface +func (fake *FakeInterface) ClearEntriesForNAT(origin, dest string, protocol v1.Protocol) error { + if protocol != v1.ProtocolUDP { + return fmt.Errorf("FakeInterface currently only supports UDP") + } + if previous, exists := fake.ClearedNATs[origin]; exists && previous != dest { + return fmt.Errorf("ClearEntriesForNAT called with same origin (%s), different destination (%s / %s)", origin, previous, dest) + } + + fake.ClearedNATs[origin] = dest + return nil +} + +// ClearEntriesForPortNAT is part of Interface +func (fake *FakeInterface) ClearEntriesForPortNAT(dest string, port int, protocol v1.Protocol) error { + if protocol != v1.ProtocolUDP { + return fmt.Errorf("FakeInterface currently only supports UDP") + } + if previous, exists := fake.ClearedPortNATs[port]; exists && previous != dest { + return fmt.Errorf("ClearEntriesForPortNAT called with same port (%d), different destination (%s / %s)", port, previous, dest) + } + + fake.ClearedPortNATs[port] = dest + return nil +}