diff --git a/pkg/proxy/iptables/proxier_test.go b/pkg/proxy/iptables/proxier_test.go index 0e10b6899d8..7e2793145bb 100644 --- a/pkg/proxy/iptables/proxier_test.go +++ b/pkg/proxy/iptables/proxier_test.go @@ -60,7 +60,7 @@ import ( "k8s.io/utils/pointer" ) -func TestDeleteEndpointConnectionsIPv4(t *testing.T) { +func TestDeleteEndpointConnections(t *testing.T) { const ( UDP = v1.ProtocolUDP TCP = v1.ProtocolTCP @@ -73,8 +73,7 @@ func TestDeleteEndpointConnectionsIPv4(t *testing.T) { svcIP string svcPort int32 protocol v1.Protocol - endpoint string // IP:port endpoint - epSvcPair proxy.ServiceEndpoint // Will be generated by test + endpoint string // IP:port endpoint simulatedErr string }{ { @@ -104,140 +103,21 @@ func TestDeleteEndpointConnectionsIPv4(t *testing.T) { { description: "V4 UDP, nothing to delete, benign error", svcName: "v4-udp-nothing-to-delete", - svcIP: "172.30.1.1", + svcIP: "172.30.4.4", svcPort: 80, protocol: UDP, - endpoint: "10.240.0.3:80", + endpoint: "10.240.0.6:80", simulatedErr: conntrack.NoConnectionToDelete, }, { description: "V4 UDP, unexpected error, should be glogged", svcName: "v4-udp-simulated-error", - svcIP: "172.30.1.1", + svcIP: "172.30.5.5", svcPort: 80, protocol: UDP, - endpoint: "10.240.0.3:80", + endpoint: "10.240.0.7:80", simulatedErr: "simulated error", }, - } - - // Create a fake executor for the conntrack utility. This should only be - // invoked for UDP and SCTP connections, since no conntrack cleanup is needed for TCP - fcmd := fakeexec.FakeCmd{} - fexec := &fakeexec.FakeExec{ - LookPathFunc: func(cmd string) (string, error) { return cmd, nil }, - } - execFunc := func(cmd string, args ...string) exec.Cmd { - return fakeexec.InitFakeCmd(&fcmd, cmd, args...) - } - for _, tc := range testCases { - if tc.protocol == UDP { - var cmdOutput string - var simErr error - if tc.simulatedErr == "" { - cmdOutput = "1 flow entries have been deleted" - } else { - simErr = fmt.Errorf(tc.simulatedErr) - } - cmdFunc := func() ([]byte, []byte, error) { return []byte(cmdOutput), nil, simErr } - fcmd.CombinedOutputScript = append(fcmd.CombinedOutputScript, cmdFunc) - fexec.CommandScript = append(fexec.CommandScript, execFunc) - } - } - - ipt := iptablestest.NewFake() - fp := NewFakeProxier(ipt) - fp.exec = fexec - - for _, tc := range testCases { - makeServiceMap(fp, - makeTestService("ns1", tc.svcName, func(svc *v1.Service) { - svc.Spec.ClusterIP = tc.svcIP - svc.Spec.Ports = []v1.ServicePort{{ - Name: "p80", - Port: tc.svcPort, - Protocol: tc.protocol, - }} - svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyLocal - }), - ) - - fp.svcPortMap.Update(fp.serviceChanges) - } - - // Run the test cases - for _, tc := range testCases { - priorExecs := fexec.CommandCalls - priorGlogErrs := klog.Stats.Error.Lines() - - svc := proxy.ServicePortName{ - NamespacedName: types.NamespacedName{Namespace: "ns1", Name: tc.svcName}, - Port: "p80", - Protocol: tc.protocol, - } - input := []proxy.ServiceEndpoint{ - { - Endpoint: tc.endpoint, - ServicePortName: svc, - }, - } - - fp.deleteUDPEndpointConnections(input) - - // For UDP and SCTP connections, check the executed conntrack command - var expExecs int - if tc.protocol == UDP { - isIPv6 := func(ip string) bool { - netIP := netutils.ParseIPSloppy(ip) - return netIP.To4() == nil - } - endpointIP := utilproxy.IPPart(tc.endpoint) - expectCommand := fmt.Sprintf("conntrack -D --orig-dst %s --dst-nat %s -p %s", tc.svcIP, endpointIP, strings.ToLower(string((tc.protocol)))) - if isIPv6(endpointIP) { - expectCommand += " -f ipv6" - } - actualCommand := strings.Join(fcmd.CombinedOutputLog[fexec.CommandCalls-1], " ") - if actualCommand != expectCommand { - t.Errorf("%s: Expected command: %s, but executed %s", tc.description, expectCommand, actualCommand) - } - expExecs = 1 - } - - // Check the number of times conntrack was executed - execs := fexec.CommandCalls - priorExecs - if execs != expExecs { - t.Errorf("%s: Expected conntrack to be executed %d times, but got %d", tc.description, expExecs, execs) - } - - // Check the number of new glog errors - var expGlogErrs int64 - if tc.simulatedErr != "" && tc.simulatedErr != conntrack.NoConnectionToDelete { - expGlogErrs = 1 - } - glogErrs := klog.Stats.Error.Lines() - priorGlogErrs - if glogErrs != expGlogErrs { - t.Errorf("%s: Expected %d glogged errors, but got %d", tc.description, expGlogErrs, glogErrs) - } - } -} - -func TestDeleteEndpointConnectionsIPv6(t *testing.T) { - const ( - UDP = v1.ProtocolUDP - TCP = v1.ProtocolTCP - SCTP = v1.ProtocolSCTP - ) - - testCases := []struct { - description string - svcName string - svcIP string - svcPort int32 - protocol v1.Protocol - endpoint string // IP:port endpoint - epSvcPair proxy.ServiceEndpoint // Will be generated by test - simulatedErr string - }{ { description: "V6 UDP", svcName: "v6-udp", @@ -264,103 +144,128 @@ func TestDeleteEndpointConnectionsIPv6(t *testing.T) { }, } - // Create a fake executor for the conntrack utility. This should only be - // invoked for UDP connections, since no conntrack cleanup is needed for TCP - fcmd := fakeexec.FakeCmd{} - fexec := &fakeexec.FakeExec{ - LookPathFunc: func(cmd string) (string, error) { return cmd, nil }, - } - execFunc := func(cmd string, args ...string) exec.Cmd { - return fakeexec.InitFakeCmd(&fcmd, cmd, args...) - } for _, tc := range testCases { - if tc.protocol == UDP { - var cmdOutput string - var simErr error - if tc.simulatedErr == "" { - cmdOutput = "1 flow entries have been deleted" - } else { - simErr = fmt.Errorf(tc.simulatedErr) + t.Run(tc.description, func(t *testing.T) { + priorGlogErrs := klog.Stats.Error.Lines() + + // Create a fake executor for the conntrack utility. + fcmd := fakeexec.FakeCmd{} + fexec := &fakeexec.FakeExec{ + LookPathFunc: func(cmd string) (string, error) { return cmd, nil }, } - cmdFunc := func() ([]byte, []byte, error) { return []byte(cmdOutput), nil, simErr } - fcmd.CombinedOutputScript = append(fcmd.CombinedOutputScript, cmdFunc) - fexec.CommandScript = append(fexec.CommandScript, execFunc) - } - } - - ipt := iptablestest.NewIPv6Fake() - fp := NewFakeProxier(ipt) - fp.exec = fexec - - for _, tc := range testCases { - makeServiceMap(fp, - makeTestService("ns1", tc.svcName, func(svc *v1.Service) { - svc.Spec.ClusterIP = tc.svcIP - svc.Spec.Ports = []v1.ServicePort{{ - Name: "p80", - Port: tc.svcPort, - Protocol: tc.protocol, - }} - svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyLocal - }), - ) - - fp.svcPortMap.Update(fp.serviceChanges) - } - - // Run the test cases - for _, tc := range testCases { - priorExecs := fexec.CommandCalls - priorGlogErrs := klog.Stats.Error.Lines() - - svc := proxy.ServicePortName{ - NamespacedName: types.NamespacedName{Namespace: "ns1", Name: tc.svcName}, - Port: "p80", - Protocol: tc.protocol, - } - input := []proxy.ServiceEndpoint{ - { - Endpoint: tc.endpoint, - ServicePortName: svc, - }, - } - - fp.deleteUDPEndpointConnections(input) - - // For UDP connections, check the executed conntrack command - var expExecs int - if tc.protocol == UDP { - isIPv6 := func(ip string) bool { - netIP := netutils.ParseIPSloppy(ip) - return netIP.To4() == nil + execFunc := func(cmd string, args ...string) exec.Cmd { + return fakeexec.InitFakeCmd(&fcmd, cmd, args...) } + + if tc.protocol == UDP { + cmdOutput := "1 flow entries have been deleted" + var simErr error + + // First call outputs cmdOutput and succeeds + fcmd.CombinedOutputScript = append(fcmd.CombinedOutputScript, + func() ([]byte, []byte, error) { return []byte(cmdOutput), nil, nil }, + ) + fexec.CommandScript = append(fexec.CommandScript, execFunc) + + // Second call may succeed or fail + if tc.simulatedErr != "" { + cmdOutput = "" + simErr = fmt.Errorf(tc.simulatedErr) + } + fcmd.CombinedOutputScript = append(fcmd.CombinedOutputScript, + func() ([]byte, []byte, error) { return []byte(cmdOutput), nil, simErr }, + ) + fexec.CommandScript = append(fexec.CommandScript, execFunc) + } + endpointIP := utilproxy.IPPart(tc.endpoint) - expectCommand := fmt.Sprintf("conntrack -D --orig-dst %s --dst-nat %s -p udp", tc.svcIP, endpointIP) - if isIPv6(endpointIP) { - expectCommand += " -f ipv6" - } - actualCommand := strings.Join(fcmd.CombinedOutputLog[fexec.CommandCalls-1], " ") - if actualCommand != expectCommand { - t.Errorf("%s: Expected command: %s, but executed %s", tc.description, expectCommand, actualCommand) - } - expExecs = 1 - } + isIPv6 := netutils.IsIPv6String(endpointIP) - // Check the number of times conntrack was executed - execs := fexec.CommandCalls - priorExecs - if execs != expExecs { - t.Errorf("%s: Expected conntrack to be executed %d times, but got %d", tc.description, expExecs, execs) - } + var ipt utiliptables.Interface + if isIPv6 { + ipt = iptablestest.NewIPv6Fake() + } else { + ipt = iptablestest.NewFake() + } + fp := NewFakeProxier(ipt) + fp.exec = fexec - // Check the number of new glog errors - var expGlogErrs int64 - if tc.simulatedErr != "" && tc.simulatedErr != conntrack.NoConnectionToDelete { - expGlogErrs = 1 - } - glogErrs := klog.Stats.Error.Lines() - priorGlogErrs - if glogErrs != expGlogErrs { - t.Errorf("%s: Expected %d glogged errors, but got %d", tc.description, expGlogErrs, glogErrs) - } + makeServiceMap(fp, + makeTestService("ns1", tc.svcName, func(svc *v1.Service) { + svc.Spec.ClusterIP = tc.svcIP + svc.Spec.Ports = []v1.ServicePort{{ + Name: "p80", + Port: tc.svcPort, + Protocol: tc.protocol, + }} + svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyLocal + }), + ) + fp.svcPortMap.Update(fp.serviceChanges) + + slice := makeTestEndpointSlice("ns1", tc.svcName, 1, func(eps *discovery.EndpointSlice) { + if isIPv6 { + eps.AddressType = discovery.AddressTypeIPv6 + } else { + eps.AddressType = discovery.AddressTypeIPv4 + } + eps.Endpoints = []discovery.Endpoint{{ + Addresses: []string{endpointIP}, + }} + eps.Ports = []discovery.EndpointPort{{ + Name: pointer.String("p80"), + Port: pointer.Int32(80), + Protocol: &tc.protocol, + }} + }) + + // Add and then remove the endpoint slice + fp.OnEndpointSliceAdd(slice) + fp.syncProxyRules() + fp.OnEndpointSliceDelete(slice) + fp.syncProxyRules() + + // Check the executed conntrack command + if tc.protocol == UDP { + if fexec.CommandCalls != 2 { + t.Fatalf("Expected conntrack to be executed 2 times, but got %d", fexec.CommandCalls) + } + + // First clear conntrack entries for the clusterIP when the + // endpoint is first added. + expectCommand := fmt.Sprintf("conntrack -D --orig-dst %s -p udp", tc.svcIP) + if isIPv6 { + expectCommand += " -f ipv6" + } + actualCommand := strings.Join(fcmd.CombinedOutputLog[0], " ") + if actualCommand != expectCommand { + t.Errorf("Expected command: %s, but executed %s", expectCommand, actualCommand) + } + + // Then clear conntrack entries for the endpoint when it is + // deleted. + expectCommand = fmt.Sprintf("conntrack -D --orig-dst %s --dst-nat %s -p udp", tc.svcIP, endpointIP) + if isIPv6 { + expectCommand += " -f ipv6" + } + actualCommand = strings.Join(fcmd.CombinedOutputLog[1], " ") + if actualCommand != expectCommand { + t.Errorf("Expected command: %s, but executed %s", expectCommand, actualCommand) + } + } else if fexec.CommandCalls != 0 { + t.Fatalf("Expected conntrack to be executed 0 times, but got %d", fexec.CommandCalls) + } + + // Check the number of new glog errors + var expGlogErrs int64 + if tc.simulatedErr != "" && tc.simulatedErr != conntrack.NoConnectionToDelete { + expGlogErrs = 1 + } + glogErrs := klog.Stats.Error.Lines() - priorGlogErrs + if glogErrs != expGlogErrs { + t.Errorf("Expected %d glogged errors, but got %d", expGlogErrs, glogErrs) + } + }) } } @@ -384,10 +289,12 @@ func NewFakeProxier(ipt utiliptables.Interface) *Proxier { // TODO: Call NewProxier after refactoring out the goroutine // invocation into a Run() method. ipfamily := v1.IPv4Protocol + podCIDR := "10.0.0.0/8" if ipt.IsIPv6() { ipfamily = v1.IPv6Protocol + podCIDR = "fd00::/64" } - detectLocal, _ := proxyutiliptables.NewDetectLocalByCIDR("10.0.0.0/8", ipt) + detectLocal, _ := proxyutiliptables.NewDetectLocalByCIDR(podCIDR, ipt) networkInterfacer := utilproxytest.NewFakeNetwork() itf := net.Interface{Index: 0, MTU: 0, Name: "lo", HardwareAddr: nil, Flags: 0}