From cdf934d5bc9709a1db33dfb7ec4a72d26d099a9c Mon Sep 17 00:00:00 2001 From: Dan Winship Date: Sun, 14 Jan 2024 09:06:35 -0500 Subject: [PATCH] Remove redundant iptables/nftables conntrack cleanup tests The iptables and nftables proxy backends had 2 unit tests (TestDeleteEndpointConnections and TestProxierDeleteNodePortStaleUDP) that were effectively testing that: - If the proxy saw various Service/EndpointSlice events this would result in specific changes to the service/endpoints trackers, AND - If the service/endpoints trackers changed in those specific ways this would result in specific UpdateServiceMapResult and UpdateEndpointsMapResult values being generated, AND - If you passed those specific UpdateServiceMapResult and UpdateEndpointsMapResult values to conntrack.CleanStaleEntries it would make specific calls to the lower-level conntrack methods, AND - If you called the lower-level conntrack methods with those specific arguments, it would result in specific executions of the conntrack binary, mixed with a specific number of klog invocations. This... is not a good unit test. We already test the change tracker behavior in other unit tests, and we already tested the Update{Service,Endpoints}MapResult behavior in the pkg/proxy unit tests, and we already tested the conntrack exec behavior in pkg/proxy/conntrack/conntrack_test.go, and we now test the CleanStaleEntries behavior in pkg/proxy/conntrack/cleanup_test.go. So there is no need to try to test the top-to-bottom behavior as a "unit test". --- pkg/proxy/conntrack/conntrack.go | 12 +- pkg/proxy/iptables/proxier_test.go | 344 ----------------------------- pkg/proxy/nftables/proxier_test.go | 338 ---------------------------- 3 files changed, 6 insertions(+), 688 deletions(-) diff --git a/pkg/proxy/conntrack/conntrack.go b/pkg/proxy/conntrack/conntrack.go index dbc6cdab3d8..dc64ec092cb 100644 --- a/pkg/proxy/conntrack/conntrack.go +++ b/pkg/proxy/conntrack/conntrack.go @@ -56,8 +56,8 @@ type execCT struct { var _ Interface = &execCT{} -// NoConnectionToDelete is the error string returned by conntrack when no matching connections are found -const NoConnectionToDelete = "0 flow entries have been deleted" +// noConnectionToDelete is the error string returned by conntrack when no matching connections are found +const noConnectionToDelete = "0 flow entries have been deleted" func protoStr(proto v1.Protocol) string { return strings.ToLower(string(proto)) @@ -89,7 +89,7 @@ func (ct *execCT) exec(parameters ...string) error { func (ct *execCT) ClearEntriesForIP(ip string, protocol v1.Protocol) error { parameters := parametersWithFamily(utilnet.IsIPv6String(ip), "-D", "--orig-dst", ip, "-p", protoStr(protocol)) err := ct.exec(parameters...) - if err != nil && !strings.Contains(err.Error(), NoConnectionToDelete) { + if err != nil && !strings.Contains(err.Error(), noConnectionToDelete) { // TODO: Better handling for deletion failure. When failure occur, stale udp connection may not get flushed. // These stale udp connection will keep black hole traffic. Making this a best effort operation for now, since it // is expensive to baby-sit all udp connections to kubernetes services. @@ -105,7 +105,7 @@ func (ct *execCT) ClearEntriesForPort(port int, isIPv6 bool, protocol v1.Protoco } parameters := parametersWithFamily(isIPv6, "-D", "-p", protoStr(protocol), "--dport", strconv.Itoa(port)) err := ct.exec(parameters...) - if err != nil && !strings.Contains(err.Error(), NoConnectionToDelete) { + if err != nil && !strings.Contains(err.Error(), noConnectionToDelete) { return fmt.Errorf("error deleting conntrack entries for UDP port: %d, error: %v", port, err) } return nil @@ -116,7 +116,7 @@ func (ct *execCT) ClearEntriesForNAT(origin, dest string, protocol v1.Protocol) parameters := parametersWithFamily(utilnet.IsIPv6String(origin), "-D", "--orig-dst", origin, "--dst-nat", dest, "-p", protoStr(protocol)) err := ct.exec(parameters...) - if err != nil && !strings.Contains(err.Error(), NoConnectionToDelete) { + if err != nil && !strings.Contains(err.Error(), noConnectionToDelete) { // TODO: Better handling for deletion failure. When failure occur, stale udp connection may not get flushed. // These stale udp connection will keep black hole traffic. Making this a best effort operation for now, since it // is expensive to baby sit all udp connections to kubernetes services. @@ -132,7 +132,7 @@ func (ct *execCT) ClearEntriesForPortNAT(dest string, port int, protocol v1.Prot } parameters := parametersWithFamily(utilnet.IsIPv6String(dest), "-D", "-p", protoStr(protocol), "--dport", strconv.Itoa(port), "--dst-nat", dest) err := ct.exec(parameters...) - if err != nil && !strings.Contains(err.Error(), NoConnectionToDelete) { + if err != nil && !strings.Contains(err.Error(), noConnectionToDelete) { return fmt.Errorf("error deleting conntrack entries for UDP port: %d, error: %v", port, err) } return nil diff --git a/pkg/proxy/iptables/proxier_test.go b/pkg/proxy/iptables/proxier_test.go index 32ab725fd4d..067e5cd93cf 100644 --- a/pkg/proxy/iptables/proxier_test.go +++ b/pkg/proxy/iptables/proxier_test.go @@ -48,7 +48,6 @@ import ( "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/proxy" - "k8s.io/kubernetes/pkg/proxy/conntrack" "k8s.io/kubernetes/pkg/proxy/metrics" "k8s.io/kubernetes/pkg/proxy/healthcheck" @@ -58,221 +57,11 @@ import ( "k8s.io/kubernetes/pkg/util/async" utiliptables "k8s.io/kubernetes/pkg/util/iptables" iptablestest "k8s.io/kubernetes/pkg/util/iptables/testing" - "k8s.io/utils/exec" fakeexec "k8s.io/utils/exec/testing" netutils "k8s.io/utils/net" "k8s.io/utils/ptr" ) -func TestDeleteEndpointConnections(t *testing.T) { - const ( - UDP = v1.ProtocolUDP - TCP = v1.ProtocolTCP - SCTP = v1.ProtocolSCTP - ) - - testCases := []struct { - description string - svcName string - svcIP string - svcPort int32 - protocol v1.Protocol - endpoint string // IP:port endpoint - simulatedErr string - }{ - { - description: "V4 UDP", - svcName: "v4-udp", - svcIP: "172.30.1.1", - svcPort: 80, - protocol: UDP, - endpoint: "10.240.0.3:80", - }, - { - description: "V4 TCP", - svcName: "v4-tcp", - svcIP: "172.30.2.2", - svcPort: 80, - protocol: TCP, - endpoint: "10.240.0.4:80", - }, - { - description: "V4 SCTP", - svcName: "v4-sctp", - svcIP: "172.30.3.3", - svcPort: 80, - protocol: SCTP, - endpoint: "10.240.0.5:80", - }, - { - description: "V4 UDP, nothing to delete, benign error", - svcName: "v4-udp-nothing-to-delete", - svcIP: "172.30.4.4", - svcPort: 80, - protocol: UDP, - endpoint: "10.240.0.6:80", - simulatedErr: conntrack.NoConnectionToDelete, - }, - { - description: "V4 UDP, unexpected error, should be glogged", - svcName: "v4-udp-simulated-error", - svcIP: "172.30.5.5", - svcPort: 80, - protocol: UDP, - endpoint: "10.240.0.7:80", - simulatedErr: "simulated error", - }, - { - description: "V6 UDP", - svcName: "v6-udp", - svcIP: "fd00:1234::20", - svcPort: 80, - protocol: UDP, - endpoint: "[2001:db8::2]:80", - }, - { - description: "V6 TCP", - svcName: "v6-tcp", - svcIP: "fd00:1234::30", - svcPort: 80, - protocol: TCP, - endpoint: "[2001:db8::3]:80", - }, - { - description: "V6 SCTP", - svcName: "v6-sctp", - svcIP: "fd00:1234::40", - svcPort: 80, - protocol: SCTP, - endpoint: "[2001:db8::4]:80", - }, - } - - for _, tc := range testCases { - t.Run(tc.description, func(t *testing.T) { - priorGlogErrs := klog.Stats.Error.Lines() - - // Create a fake executor for the conntrack utility. - fcmd := fakeexec.FakeCmd{} - fexec := &fakeexec.FakeExec{ - LookPathFunc: func(cmd string) (string, error) { return cmd, nil }, - } - execFunc := func(cmd string, args ...string) exec.Cmd { - return fakeexec.InitFakeCmd(&fcmd, cmd, args...) - } - - if tc.protocol == UDP { - cmdOutput := "1 flow entries have been deleted" - var simErr error - - // First call outputs cmdOutput and succeeds - fcmd.CombinedOutputScript = append(fcmd.CombinedOutputScript, - func() ([]byte, []byte, error) { return []byte(cmdOutput), nil, nil }, - ) - fexec.CommandScript = append(fexec.CommandScript, execFunc) - - // Second call may succeed or fail - if tc.simulatedErr != "" { - cmdOutput = "" - simErr = fmt.Errorf(tc.simulatedErr) - } - fcmd.CombinedOutputScript = append(fcmd.CombinedOutputScript, - func() ([]byte, []byte, error) { return []byte(cmdOutput), nil, simErr }, - ) - fexec.CommandScript = append(fexec.CommandScript, execFunc) - } - - endpointIP := proxyutil.IPPart(tc.endpoint) - isIPv6 := netutils.IsIPv6String(endpointIP) - - var ipt utiliptables.Interface - if isIPv6 { - ipt = iptablestest.NewIPv6Fake() - } else { - ipt = iptablestest.NewFake() - } - fp := NewFakeProxier(ipt) - fp.exec = fexec - - makeServiceMap(fp, - makeTestService("ns1", tc.svcName, func(svc *v1.Service) { - svc.Spec.ClusterIP = tc.svcIP - svc.Spec.Ports = []v1.ServicePort{{ - Name: "p80", - Port: tc.svcPort, - Protocol: tc.protocol, - }} - svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyLocal - }), - ) - fp.svcPortMap.Update(fp.serviceChanges) - - slice := makeTestEndpointSlice("ns1", tc.svcName, 1, func(eps *discovery.EndpointSlice) { - if isIPv6 { - eps.AddressType = discovery.AddressTypeIPv6 - } else { - eps.AddressType = discovery.AddressTypeIPv4 - } - eps.Endpoints = []discovery.Endpoint{{ - Addresses: []string{endpointIP}, - }} - eps.Ports = []discovery.EndpointPort{{ - Name: ptr.To("p80"), - Port: ptr.To[int32](80), - Protocol: ptr.To(tc.protocol), - }} - }) - - // Add and then remove the endpoint slice - fp.OnEndpointSliceAdd(slice) - fp.syncProxyRules() - fp.OnEndpointSliceDelete(slice) - fp.syncProxyRules() - - // Check the executed conntrack command - if tc.protocol == UDP { - if fexec.CommandCalls != 2 { - t.Fatalf("Expected conntrack to be executed 2 times, but got %d", fexec.CommandCalls) - } - - // First clear conntrack entries for the clusterIP when the - // endpoint is first added. - expectCommand := fmt.Sprintf("conntrack -D --orig-dst %s -p udp", tc.svcIP) - if isIPv6 { - expectCommand += " -f ipv6" - } - actualCommand := strings.Join(fcmd.CombinedOutputLog[0], " ") - if actualCommand != expectCommand { - t.Errorf("Expected command: %s, but executed %s", expectCommand, actualCommand) - } - - // Then clear conntrack entries for the endpoint when it is - // deleted. - expectCommand = fmt.Sprintf("conntrack -D --orig-dst %s --dst-nat %s -p udp", tc.svcIP, endpointIP) - if isIPv6 { - expectCommand += " -f ipv6" - } - actualCommand = strings.Join(fcmd.CombinedOutputLog[1], " ") - if actualCommand != expectCommand { - t.Errorf("Expected command: %s, but executed %s", expectCommand, actualCommand) - } - } else if fexec.CommandCalls != 0 { - t.Fatalf("Expected conntrack to be executed 0 times, but got %d", fexec.CommandCalls) - } - - // Check the number of new glog errors - var expGlogErrs int64 - if tc.simulatedErr != "" && tc.simulatedErr != conntrack.NoConnectionToDelete { - expGlogErrs = 1 - } - glogErrs := klog.Stats.Error.Lines() - priorGlogErrs - if glogErrs != expGlogErrs { - t.Errorf("Expected %d glogged errors, but got %d", expGlogErrs, glogErrs) - } - }) - } -} - // Conventions for tests using NewFakeProxier: // // Pod IPs: 10.0.0.0/8 @@ -4337,139 +4126,6 @@ func TestHealthCheckNodePortWhenTerminating(t *testing.T) { } } -func TestProxierDeleteNodePortStaleUDP(t *testing.T) { - fcmd := fakeexec.FakeCmd{} - fexec := &fakeexec.FakeExec{ - LookPathFunc: func(cmd string) (string, error) { return cmd, nil }, - } - execFunc := func(cmd string, args ...string) exec.Cmd { - return fakeexec.InitFakeCmd(&fcmd, cmd, args...) - } - cmdOutput := "1 flow entries have been deleted" - cmdFunc := func() ([]byte, []byte, error) { return []byte(cmdOutput), nil, nil } - - // Delete ClusterIP entries - fcmd.CombinedOutputScript = append(fcmd.CombinedOutputScript, cmdFunc) - fexec.CommandScript = append(fexec.CommandScript, execFunc) - // Delete ExternalIP entries - fcmd.CombinedOutputScript = append(fcmd.CombinedOutputScript, cmdFunc) - fexec.CommandScript = append(fexec.CommandScript, execFunc) - // Delete LoadBalancerIP entries - fcmd.CombinedOutputScript = append(fcmd.CombinedOutputScript, cmdFunc) - fexec.CommandScript = append(fexec.CommandScript, execFunc) - // Delete NodePort entries - fcmd.CombinedOutputScript = append(fcmd.CombinedOutputScript, cmdFunc) - fexec.CommandScript = append(fexec.CommandScript, execFunc) - - ipt := iptablestest.NewFake() - fp := NewFakeProxier(ipt) - fp.exec = fexec - - svcIP := "172.30.0.41" - extIP := "192.168.99.11" - lbIngressIP := "1.2.3.4" - svcPort := 80 - nodePort := 31201 - svcPortName := proxy.ServicePortName{ - NamespacedName: makeNSN("ns1", "svc1"), - Port: "p80", - Protocol: v1.ProtocolUDP, - } - - makeServiceMap(fp, - makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) { - svc.Spec.ClusterIP = svcIP - svc.Spec.ExternalIPs = []string{extIP} - svc.Spec.Type = "LoadBalancer" - svc.Spec.Ports = []v1.ServicePort{{ - Name: svcPortName.Port, - Port: int32(svcPort), - Protocol: v1.ProtocolUDP, - NodePort: int32(nodePort), - }} - svc.Status.LoadBalancer.Ingress = []v1.LoadBalancerIngress{{ - IP: lbIngressIP, - }} - }), - ) - - fp.syncProxyRules() - if fexec.CommandCalls != 0 { - t.Fatalf("Created service without endpoints must not clear conntrack entries") - } - - epIP := "10.180.0.1" - populateEndpointSlices(fp, - makeTestEndpointSlice(svcPortName.Namespace, svcPortName.Name, 1, func(eps *discovery.EndpointSlice) { - eps.AddressType = discovery.AddressTypeIPv4 - eps.Endpoints = []discovery.Endpoint{{ - Addresses: []string{epIP}, - Conditions: discovery.EndpointConditions{ - Serving: ptr.To(false), - }, - }} - eps.Ports = []discovery.EndpointPort{{ - Name: ptr.To(svcPortName.Port), - Port: ptr.To(int32(svcPort)), - Protocol: ptr.To(v1.ProtocolUDP), - }} - }), - ) - - fp.syncProxyRules() - - if fexec.CommandCalls != 0 { - t.Fatalf("Updated UDP service with not ready endpoints must not clear UDP entries") - } - - populateEndpointSlices(fp, - makeTestEndpointSlice(svcPortName.Namespace, svcPortName.Name, 1, func(eps *discovery.EndpointSlice) { - eps.AddressType = discovery.AddressTypeIPv4 - eps.Endpoints = []discovery.Endpoint{{ - Addresses: []string{epIP}, - Conditions: discovery.EndpointConditions{ - Serving: ptr.To(true), - }, - }} - eps.Ports = []discovery.EndpointPort{{ - Name: ptr.To(svcPortName.Port), - Port: ptr.To(int32(svcPort)), - Protocol: ptr.To(v1.ProtocolUDP), - }} - }), - ) - - fp.syncProxyRules() - - if fexec.CommandCalls != 4 { - t.Fatalf("Updated UDP service with new endpoints must clear UDP entries 4 times: ClusterIP, NodePort, ExternalIP and LB") - } - - // the order is not guaranteed so we have to compare the strings in any order - expectedCommands := []string{ - // Delete ClusterIP Conntrack entries - fmt.Sprintf("conntrack -D --orig-dst %s -p %s", svcIP, strings.ToLower(string((v1.ProtocolUDP)))), - // Delete ExternalIP Conntrack entries - fmt.Sprintf("conntrack -D --orig-dst %s -p %s", extIP, strings.ToLower(string((v1.ProtocolUDP)))), - // Delete LoadBalancerIP Conntrack entries - fmt.Sprintf("conntrack -D --orig-dst %s -p %s", lbIngressIP, strings.ToLower(string((v1.ProtocolUDP)))), - // Delete NodePort Conntrack entrie - fmt.Sprintf("conntrack -D -p %s --dport %d", strings.ToLower(string((v1.ProtocolUDP))), nodePort), - } - actualCommands := []string{ - strings.Join(fcmd.CombinedOutputLog[0], " "), - strings.Join(fcmd.CombinedOutputLog[1], " "), - strings.Join(fcmd.CombinedOutputLog[2], " "), - strings.Join(fcmd.CombinedOutputLog[3], " "), - } - sort.Strings(expectedCommands) - sort.Strings(actualCommands) - - if !reflect.DeepEqual(expectedCommands, actualCommands) { - t.Errorf("Expected commands: %v, but executed %v", expectedCommands, actualCommands) - } -} - func TestProxierMetricsIptablesTotalRules(t *testing.T) { ipt := iptablestest.NewFake() fp := NewFakeProxier(ipt) diff --git a/pkg/proxy/nftables/proxier_test.go b/pkg/proxy/nftables/proxier_test.go index 5f91b8acb5f..f1486401c75 100644 --- a/pkg/proxy/nftables/proxier_test.go +++ b/pkg/proxy/nftables/proxier_test.go @@ -23,8 +23,6 @@ import ( "fmt" "net" "reflect" - "sort" - "strings" "testing" "time" @@ -39,10 +37,8 @@ import ( utilfeature "k8s.io/apiserver/pkg/util/feature" featuregatetesting "k8s.io/component-base/featuregate/testing" "k8s.io/component-base/metrics/testutil" - "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/proxy" - "k8s.io/kubernetes/pkg/proxy/conntrack" "k8s.io/kubernetes/pkg/proxy/metrics" "k8s.io/kubernetes/pkg/proxy/healthcheck" @@ -50,213 +46,11 @@ import ( proxyutiliptables "k8s.io/kubernetes/pkg/proxy/util/iptables" proxyutiltest "k8s.io/kubernetes/pkg/proxy/util/testing" "k8s.io/kubernetes/pkg/util/async" - "k8s.io/utils/exec" fakeexec "k8s.io/utils/exec/testing" netutils "k8s.io/utils/net" "k8s.io/utils/ptr" ) -func TestDeleteEndpointConnections(t *testing.T) { - const ( - UDP = v1.ProtocolUDP - TCP = v1.ProtocolTCP - SCTP = v1.ProtocolSCTP - ) - - testCases := []struct { - description string - svcName string - svcIP string - svcPort int32 - protocol v1.Protocol - endpoint string // IP:port endpoint - simulatedErr string - }{ - { - description: "V4 UDP", - svcName: "v4-udp", - svcIP: "172.30.1.1", - svcPort: 80, - protocol: UDP, - endpoint: "10.240.0.3:80", - }, - { - description: "V4 TCP", - svcName: "v4-tcp", - svcIP: "172.30.2.2", - svcPort: 80, - protocol: TCP, - endpoint: "10.240.0.4:80", - }, - { - description: "V4 SCTP", - svcName: "v4-sctp", - svcIP: "172.30.3.3", - svcPort: 80, - protocol: SCTP, - endpoint: "10.240.0.5:80", - }, - { - description: "V4 UDP, nothing to delete, benign error", - svcName: "v4-udp-nothing-to-delete", - svcIP: "172.30.4.4", - svcPort: 80, - protocol: UDP, - endpoint: "10.240.0.6:80", - simulatedErr: conntrack.NoConnectionToDelete, - }, - { - description: "V4 UDP, unexpected error, should be glogged", - svcName: "v4-udp-simulated-error", - svcIP: "172.30.5.5", - svcPort: 80, - protocol: UDP, - endpoint: "10.240.0.7:80", - simulatedErr: "simulated error", - }, - { - description: "V6 UDP", - svcName: "v6-udp", - svcIP: "fd00:1234::20", - svcPort: 80, - protocol: UDP, - endpoint: "[2001:db8::2]:80", - }, - { - description: "V6 TCP", - svcName: "v6-tcp", - svcIP: "fd00:1234::30", - svcPort: 80, - protocol: TCP, - endpoint: "[2001:db8::3]:80", - }, - { - description: "V6 SCTP", - svcName: "v6-sctp", - svcIP: "fd00:1234::40", - svcPort: 80, - protocol: SCTP, - endpoint: "[2001:db8::4]:80", - }, - } - - for _, tc := range testCases { - t.Run(tc.description, func(t *testing.T) { - priorGlogErrs := klog.Stats.Error.Lines() - - // Create a fake executor for the conntrack utility. - fcmd := fakeexec.FakeCmd{} - fexec := &fakeexec.FakeExec{ - LookPathFunc: func(cmd string) (string, error) { return cmd, nil }, - } - execFunc := func(cmd string, args ...string) exec.Cmd { - return fakeexec.InitFakeCmd(&fcmd, cmd, args...) - } - - if tc.protocol == UDP { - cmdOutput := "1 flow entries have been deleted" - var simErr error - - // First call outputs cmdOutput and succeeds - fcmd.CombinedOutputScript = append(fcmd.CombinedOutputScript, - func() ([]byte, []byte, error) { return []byte(cmdOutput), nil, nil }, - ) - fexec.CommandScript = append(fexec.CommandScript, execFunc) - - // Second call may succeed or fail - if tc.simulatedErr != "" { - cmdOutput = "" - simErr = fmt.Errorf(tc.simulatedErr) - } - fcmd.CombinedOutputScript = append(fcmd.CombinedOutputScript, - func() ([]byte, []byte, error) { return []byte(cmdOutput), nil, simErr }, - ) - fexec.CommandScript = append(fexec.CommandScript, execFunc) - } - - endpointIP := proxyutil.IPPart(tc.endpoint) - _, fp := NewFakeProxier(proxyutil.GetIPFamilyFromIP(netutils.ParseIPSloppy(endpointIP))) - fp.exec = fexec - - makeServiceMap(fp, - makeTestService("ns1", tc.svcName, func(svc *v1.Service) { - svc.Spec.ClusterIP = tc.svcIP - svc.Spec.Ports = []v1.ServicePort{{ - Name: "p80", - Port: tc.svcPort, - Protocol: tc.protocol, - }} - svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyLocal - }), - ) - fp.svcPortMap.Update(fp.serviceChanges) - - slice := makeTestEndpointSlice("ns1", tc.svcName, 1, func(eps *discovery.EndpointSlice) { - if fp.ipFamily == v1.IPv6Protocol { - eps.AddressType = discovery.AddressTypeIPv6 - } else { - eps.AddressType = discovery.AddressTypeIPv4 - } - eps.Endpoints = []discovery.Endpoint{{ - Addresses: []string{endpointIP}, - }} - eps.Ports = []discovery.EndpointPort{{ - Name: ptr.To("p80"), - Port: ptr.To[int32](80), - Protocol: ptr.To(tc.protocol), - }} - }) - - // Add and then remove the endpoint slice - fp.OnEndpointSliceAdd(slice) - fp.syncProxyRules() - fp.OnEndpointSliceDelete(slice) - fp.syncProxyRules() - - // Check the executed conntrack command - if tc.protocol == UDP { - if fexec.CommandCalls != 2 { - t.Fatalf("Expected conntrack to be executed 2 times, but got %d", fexec.CommandCalls) - } - - // First clear conntrack entries for the clusterIP when the - // endpoint is first added. - expectCommand := fmt.Sprintf("conntrack -D --orig-dst %s -p udp", tc.svcIP) - if fp.ipFamily == v1.IPv6Protocol { - expectCommand += " -f ipv6" - } - actualCommand := strings.Join(fcmd.CombinedOutputLog[0], " ") - if actualCommand != expectCommand { - t.Errorf("Expected command: %s, but executed %s", expectCommand, actualCommand) - } - - // Then clear conntrack entries for the endpoint when it is - // deleted. - expectCommand = fmt.Sprintf("conntrack -D --orig-dst %s --dst-nat %s -p udp", tc.svcIP, endpointIP) - if fp.ipFamily == v1.IPv6Protocol { - expectCommand += " -f ipv6" - } - actualCommand = strings.Join(fcmd.CombinedOutputLog[1], " ") - if actualCommand != expectCommand { - t.Errorf("Expected command: %s, but executed %s", expectCommand, actualCommand) - } - } else if fexec.CommandCalls != 0 { - t.Fatalf("Expected conntrack to be executed 0 times, but got %d", fexec.CommandCalls) - } - - // Check the number of new glog errors - var expGlogErrs int64 - if tc.simulatedErr != "" && tc.simulatedErr != conntrack.NoConnectionToDelete { - expGlogErrs = 1 - } - glogErrs := klog.Stats.Error.Lines() - priorGlogErrs - if glogErrs != expGlogErrs { - t.Errorf("Expected %d glogged errors, but got %d", expGlogErrs, glogErrs) - } - }) - } -} - // Conventions for tests using NewFakeProxier: // // Pod IPs: 10.0.0.0/8 @@ -2737,138 +2531,6 @@ func TestHealthCheckNodePortWhenTerminating(t *testing.T) { } } -func TestProxierDeleteNodePortStaleUDP(t *testing.T) { - fcmd := fakeexec.FakeCmd{} - fexec := &fakeexec.FakeExec{ - LookPathFunc: func(cmd string) (string, error) { return cmd, nil }, - } - execFunc := func(cmd string, args ...string) exec.Cmd { - return fakeexec.InitFakeCmd(&fcmd, cmd, args...) - } - cmdOutput := "1 flow entries have been deleted" - cmdFunc := func() ([]byte, []byte, error) { return []byte(cmdOutput), nil, nil } - - // Delete ClusterIP entries - fcmd.CombinedOutputScript = append(fcmd.CombinedOutputScript, cmdFunc) - fexec.CommandScript = append(fexec.CommandScript, execFunc) - // Delete ExternalIP entries - fcmd.CombinedOutputScript = append(fcmd.CombinedOutputScript, cmdFunc) - fexec.CommandScript = append(fexec.CommandScript, execFunc) - // Delete LoadBalancerIP entries - fcmd.CombinedOutputScript = append(fcmd.CombinedOutputScript, cmdFunc) - fexec.CommandScript = append(fexec.CommandScript, execFunc) - // Delete NodePort entries - fcmd.CombinedOutputScript = append(fcmd.CombinedOutputScript, cmdFunc) - fexec.CommandScript = append(fexec.CommandScript, execFunc) - - _, fp := NewFakeProxier(v1.IPv4Protocol) - fp.exec = fexec - - svcIP := "172.30.0.41" - extIP := "192.168.99.11" - lbIngressIP := "1.2.3.4" - svcPort := 80 - nodePort := 31201 - svcPortName := proxy.ServicePortName{ - NamespacedName: makeNSN("ns1", "svc1"), - Port: "p80", - Protocol: v1.ProtocolUDP, - } - - makeServiceMap(fp, - makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) { - svc.Spec.ClusterIP = svcIP - svc.Spec.ExternalIPs = []string{extIP} - svc.Spec.Type = "LoadBalancer" - svc.Spec.Ports = []v1.ServicePort{{ - Name: svcPortName.Port, - Port: int32(svcPort), - Protocol: v1.ProtocolUDP, - NodePort: int32(nodePort), - }} - svc.Status.LoadBalancer.Ingress = []v1.LoadBalancerIngress{{ - IP: lbIngressIP, - }} - }), - ) - - fp.syncProxyRules() - if fexec.CommandCalls != 0 { - t.Fatalf("Created service without endpoints must not clear conntrack entries") - } - - epIP := "10.180.0.1" - populateEndpointSlices(fp, - makeTestEndpointSlice(svcPortName.Namespace, svcPortName.Name, 1, func(eps *discovery.EndpointSlice) { - eps.AddressType = discovery.AddressTypeIPv4 - eps.Endpoints = []discovery.Endpoint{{ - Addresses: []string{epIP}, - Conditions: discovery.EndpointConditions{ - Serving: ptr.To(false), - }, - }} - eps.Ports = []discovery.EndpointPort{{ - Name: ptr.To(svcPortName.Port), - Port: ptr.To(int32(svcPort)), - Protocol: ptr.To(v1.ProtocolUDP), - }} - }), - ) - - fp.syncProxyRules() - - if fexec.CommandCalls != 0 { - t.Fatalf("Updated UDP service with not ready endpoints must not clear UDP entries") - } - - populateEndpointSlices(fp, - makeTestEndpointSlice(svcPortName.Namespace, svcPortName.Name, 1, func(eps *discovery.EndpointSlice) { - eps.AddressType = discovery.AddressTypeIPv4 - eps.Endpoints = []discovery.Endpoint{{ - Addresses: []string{epIP}, - Conditions: discovery.EndpointConditions{ - Serving: ptr.To(true), - }, - }} - eps.Ports = []discovery.EndpointPort{{ - Name: ptr.To(svcPortName.Port), - Port: ptr.To(int32(svcPort)), - Protocol: ptr.To(v1.ProtocolUDP), - }} - }), - ) - - fp.syncProxyRules() - - if fexec.CommandCalls != 4 { - t.Fatalf("Updated UDP service with new endpoints must clear UDP entries 4 times: ClusterIP, NodePort, ExternalIP and LB") - } - - // the order is not guaranteed so we have to compare the strings in any order - expectedCommands := []string{ - // Delete ClusterIP Conntrack entries - fmt.Sprintf("conntrack -D --orig-dst %s -p %s", svcIP, strings.ToLower(string((v1.ProtocolUDP)))), - // Delete ExternalIP Conntrack entries - fmt.Sprintf("conntrack -D --orig-dst %s -p %s", extIP, strings.ToLower(string((v1.ProtocolUDP)))), - // Delete LoadBalancerIP Conntrack entries - fmt.Sprintf("conntrack -D --orig-dst %s -p %s", lbIngressIP, strings.ToLower(string((v1.ProtocolUDP)))), - // Delete NodePort Conntrack entrie - fmt.Sprintf("conntrack -D -p %s --dport %d", strings.ToLower(string((v1.ProtocolUDP))), nodePort), - } - actualCommands := []string{ - strings.Join(fcmd.CombinedOutputLog[0], " "), - strings.Join(fcmd.CombinedOutputLog[1], " "), - strings.Join(fcmd.CombinedOutputLog[2], " "), - strings.Join(fcmd.CombinedOutputLog[3], " "), - } - sort.Strings(expectedCommands) - sort.Strings(actualCommands) - - if !reflect.DeepEqual(expectedCommands, actualCommands) { - t.Errorf("Expected commands: %v, but executed %v", expectedCommands, actualCommands) - } -} - // TODO(thockin): add *more* tests for syncProxyRules() or break it down further and test the pieces. // This test ensures that the iptables proxier supports translating Endpoints to