Make deleteEndpointConnection test use syncProxyRules

Rather than calling fp.deleteEndpointConnection() directly, set up the
proxy to have syncProxyRules() call it, so that we are testing it in
the way that it actually gets called.

Squash the IPv4 and IPv6 unit tests together so we don't need to
duplicate all that code. Fix a tiny bug in NewFakeProxier() found
while doing this...
This commit is contained in:
Dan Winship 2023-01-24 11:28:55 -05:00
parent dea8e34ea7
commit c5c0d9f5bd

View File

@ -60,7 +60,7 @@ import (
"k8s.io/utils/pointer" "k8s.io/utils/pointer"
) )
func TestDeleteEndpointConnectionsIPv4(t *testing.T) { func TestDeleteEndpointConnections(t *testing.T) {
const ( const (
UDP = v1.ProtocolUDP UDP = v1.ProtocolUDP
TCP = v1.ProtocolTCP TCP = v1.ProtocolTCP
@ -73,8 +73,7 @@ func TestDeleteEndpointConnectionsIPv4(t *testing.T) {
svcIP string svcIP string
svcPort int32 svcPort int32
protocol v1.Protocol protocol v1.Protocol
endpoint string // IP:port endpoint endpoint string // IP:port endpoint
epSvcPair proxy.ServiceEndpoint // Will be generated by test
simulatedErr string simulatedErr string
}{ }{
{ {
@ -104,140 +103,21 @@ func TestDeleteEndpointConnectionsIPv4(t *testing.T) {
{ {
description: "V4 UDP, nothing to delete, benign error", description: "V4 UDP, nothing to delete, benign error",
svcName: "v4-udp-nothing-to-delete", svcName: "v4-udp-nothing-to-delete",
svcIP: "172.30.1.1", svcIP: "172.30.4.4",
svcPort: 80, svcPort: 80,
protocol: UDP, protocol: UDP,
endpoint: "10.240.0.3:80", endpoint: "10.240.0.6:80",
simulatedErr: conntrack.NoConnectionToDelete, simulatedErr: conntrack.NoConnectionToDelete,
}, },
{ {
description: "V4 UDP, unexpected error, should be glogged", description: "V4 UDP, unexpected error, should be glogged",
svcName: "v4-udp-simulated-error", svcName: "v4-udp-simulated-error",
svcIP: "172.30.1.1", svcIP: "172.30.5.5",
svcPort: 80, svcPort: 80,
protocol: UDP, protocol: UDP,
endpoint: "10.240.0.3:80", endpoint: "10.240.0.7:80",
simulatedErr: "simulated error", simulatedErr: "simulated error",
}, },
}
// Create a fake executor for the conntrack utility. This should only be
// invoked for UDP and SCTP connections, since no conntrack cleanup is needed for TCP
fcmd := fakeexec.FakeCmd{}
fexec := &fakeexec.FakeExec{
LookPathFunc: func(cmd string) (string, error) { return cmd, nil },
}
execFunc := func(cmd string, args ...string) exec.Cmd {
return fakeexec.InitFakeCmd(&fcmd, cmd, args...)
}
for _, tc := range testCases {
if tc.protocol == UDP {
var cmdOutput string
var simErr error
if tc.simulatedErr == "" {
cmdOutput = "1 flow entries have been deleted"
} else {
simErr = fmt.Errorf(tc.simulatedErr)
}
cmdFunc := func() ([]byte, []byte, error) { return []byte(cmdOutput), nil, simErr }
fcmd.CombinedOutputScript = append(fcmd.CombinedOutputScript, cmdFunc)
fexec.CommandScript = append(fexec.CommandScript, execFunc)
}
}
ipt := iptablestest.NewFake()
fp := NewFakeProxier(ipt)
fp.exec = fexec
for _, tc := range testCases {
makeServiceMap(fp,
makeTestService("ns1", tc.svcName, func(svc *v1.Service) {
svc.Spec.ClusterIP = tc.svcIP
svc.Spec.Ports = []v1.ServicePort{{
Name: "p80",
Port: tc.svcPort,
Protocol: tc.protocol,
}}
svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyLocal
}),
)
fp.svcPortMap.Update(fp.serviceChanges)
}
// Run the test cases
for _, tc := range testCases {
priorExecs := fexec.CommandCalls
priorGlogErrs := klog.Stats.Error.Lines()
svc := proxy.ServicePortName{
NamespacedName: types.NamespacedName{Namespace: "ns1", Name: tc.svcName},
Port: "p80",
Protocol: tc.protocol,
}
input := []proxy.ServiceEndpoint{
{
Endpoint: tc.endpoint,
ServicePortName: svc,
},
}
fp.deleteUDPEndpointConnections(input)
// For UDP and SCTP connections, check the executed conntrack command
var expExecs int
if tc.protocol == UDP {
isIPv6 := func(ip string) bool {
netIP := netutils.ParseIPSloppy(ip)
return netIP.To4() == nil
}
endpointIP := utilproxy.IPPart(tc.endpoint)
expectCommand := fmt.Sprintf("conntrack -D --orig-dst %s --dst-nat %s -p %s", tc.svcIP, endpointIP, strings.ToLower(string((tc.protocol))))
if isIPv6(endpointIP) {
expectCommand += " -f ipv6"
}
actualCommand := strings.Join(fcmd.CombinedOutputLog[fexec.CommandCalls-1], " ")
if actualCommand != expectCommand {
t.Errorf("%s: Expected command: %s, but executed %s", tc.description, expectCommand, actualCommand)
}
expExecs = 1
}
// Check the number of times conntrack was executed
execs := fexec.CommandCalls - priorExecs
if execs != expExecs {
t.Errorf("%s: Expected conntrack to be executed %d times, but got %d", tc.description, expExecs, execs)
}
// Check the number of new glog errors
var expGlogErrs int64
if tc.simulatedErr != "" && tc.simulatedErr != conntrack.NoConnectionToDelete {
expGlogErrs = 1
}
glogErrs := klog.Stats.Error.Lines() - priorGlogErrs
if glogErrs != expGlogErrs {
t.Errorf("%s: Expected %d glogged errors, but got %d", tc.description, expGlogErrs, glogErrs)
}
}
}
func TestDeleteEndpointConnectionsIPv6(t *testing.T) {
const (
UDP = v1.ProtocolUDP
TCP = v1.ProtocolTCP
SCTP = v1.ProtocolSCTP
)
testCases := []struct {
description string
svcName string
svcIP string
svcPort int32
protocol v1.Protocol
endpoint string // IP:port endpoint
epSvcPair proxy.ServiceEndpoint // Will be generated by test
simulatedErr string
}{
{ {
description: "V6 UDP", description: "V6 UDP",
svcName: "v6-udp", svcName: "v6-udp",
@ -264,103 +144,128 @@ func TestDeleteEndpointConnectionsIPv6(t *testing.T) {
}, },
} }
// Create a fake executor for the conntrack utility. This should only be
// invoked for UDP connections, since no conntrack cleanup is needed for TCP
fcmd := fakeexec.FakeCmd{}
fexec := &fakeexec.FakeExec{
LookPathFunc: func(cmd string) (string, error) { return cmd, nil },
}
execFunc := func(cmd string, args ...string) exec.Cmd {
return fakeexec.InitFakeCmd(&fcmd, cmd, args...)
}
for _, tc := range testCases { for _, tc := range testCases {
if tc.protocol == UDP { t.Run(tc.description, func(t *testing.T) {
var cmdOutput string priorGlogErrs := klog.Stats.Error.Lines()
var simErr error
if tc.simulatedErr == "" { // Create a fake executor for the conntrack utility.
cmdOutput = "1 flow entries have been deleted" fcmd := fakeexec.FakeCmd{}
} else { fexec := &fakeexec.FakeExec{
simErr = fmt.Errorf(tc.simulatedErr) LookPathFunc: func(cmd string) (string, error) { return cmd, nil },
} }
cmdFunc := func() ([]byte, []byte, error) { return []byte(cmdOutput), nil, simErr } execFunc := func(cmd string, args ...string) exec.Cmd {
fcmd.CombinedOutputScript = append(fcmd.CombinedOutputScript, cmdFunc) return fakeexec.InitFakeCmd(&fcmd, cmd, args...)
fexec.CommandScript = append(fexec.CommandScript, execFunc)
}
}
ipt := iptablestest.NewIPv6Fake()
fp := NewFakeProxier(ipt)
fp.exec = fexec
for _, tc := range testCases {
makeServiceMap(fp,
makeTestService("ns1", tc.svcName, func(svc *v1.Service) {
svc.Spec.ClusterIP = tc.svcIP
svc.Spec.Ports = []v1.ServicePort{{
Name: "p80",
Port: tc.svcPort,
Protocol: tc.protocol,
}}
svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyLocal
}),
)
fp.svcPortMap.Update(fp.serviceChanges)
}
// Run the test cases
for _, tc := range testCases {
priorExecs := fexec.CommandCalls
priorGlogErrs := klog.Stats.Error.Lines()
svc := proxy.ServicePortName{
NamespacedName: types.NamespacedName{Namespace: "ns1", Name: tc.svcName},
Port: "p80",
Protocol: tc.protocol,
}
input := []proxy.ServiceEndpoint{
{
Endpoint: tc.endpoint,
ServicePortName: svc,
},
}
fp.deleteUDPEndpointConnections(input)
// For UDP connections, check the executed conntrack command
var expExecs int
if tc.protocol == UDP {
isIPv6 := func(ip string) bool {
netIP := netutils.ParseIPSloppy(ip)
return netIP.To4() == nil
} }
if tc.protocol == UDP {
cmdOutput := "1 flow entries have been deleted"
var simErr error
// First call outputs cmdOutput and succeeds
fcmd.CombinedOutputScript = append(fcmd.CombinedOutputScript,
func() ([]byte, []byte, error) { return []byte(cmdOutput), nil, nil },
)
fexec.CommandScript = append(fexec.CommandScript, execFunc)
// Second call may succeed or fail
if tc.simulatedErr != "" {
cmdOutput = ""
simErr = fmt.Errorf(tc.simulatedErr)
}
fcmd.CombinedOutputScript = append(fcmd.CombinedOutputScript,
func() ([]byte, []byte, error) { return []byte(cmdOutput), nil, simErr },
)
fexec.CommandScript = append(fexec.CommandScript, execFunc)
}
endpointIP := utilproxy.IPPart(tc.endpoint) endpointIP := utilproxy.IPPart(tc.endpoint)
expectCommand := fmt.Sprintf("conntrack -D --orig-dst %s --dst-nat %s -p udp", tc.svcIP, endpointIP) isIPv6 := netutils.IsIPv6String(endpointIP)
if isIPv6(endpointIP) {
expectCommand += " -f ipv6"
}
actualCommand := strings.Join(fcmd.CombinedOutputLog[fexec.CommandCalls-1], " ")
if actualCommand != expectCommand {
t.Errorf("%s: Expected command: %s, but executed %s", tc.description, expectCommand, actualCommand)
}
expExecs = 1
}
// Check the number of times conntrack was executed var ipt utiliptables.Interface
execs := fexec.CommandCalls - priorExecs if isIPv6 {
if execs != expExecs { ipt = iptablestest.NewIPv6Fake()
t.Errorf("%s: Expected conntrack to be executed %d times, but got %d", tc.description, expExecs, execs) } else {
} ipt = iptablestest.NewFake()
}
fp := NewFakeProxier(ipt)
fp.exec = fexec
// Check the number of new glog errors makeServiceMap(fp,
var expGlogErrs int64 makeTestService("ns1", tc.svcName, func(svc *v1.Service) {
if tc.simulatedErr != "" && tc.simulatedErr != conntrack.NoConnectionToDelete { svc.Spec.ClusterIP = tc.svcIP
expGlogErrs = 1 svc.Spec.Ports = []v1.ServicePort{{
} Name: "p80",
glogErrs := klog.Stats.Error.Lines() - priorGlogErrs Port: tc.svcPort,
if glogErrs != expGlogErrs { Protocol: tc.protocol,
t.Errorf("%s: Expected %d glogged errors, but got %d", tc.description, expGlogErrs, glogErrs) }}
} svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyLocal
}),
)
fp.svcPortMap.Update(fp.serviceChanges)
slice := makeTestEndpointSlice("ns1", tc.svcName, 1, func(eps *discovery.EndpointSlice) {
if isIPv6 {
eps.AddressType = discovery.AddressTypeIPv6
} else {
eps.AddressType = discovery.AddressTypeIPv4
}
eps.Endpoints = []discovery.Endpoint{{
Addresses: []string{endpointIP},
}}
eps.Ports = []discovery.EndpointPort{{
Name: pointer.String("p80"),
Port: pointer.Int32(80),
Protocol: &tc.protocol,
}}
})
// Add and then remove the endpoint slice
fp.OnEndpointSliceAdd(slice)
fp.syncProxyRules()
fp.OnEndpointSliceDelete(slice)
fp.syncProxyRules()
// Check the executed conntrack command
if tc.protocol == UDP {
if fexec.CommandCalls != 2 {
t.Fatalf("Expected conntrack to be executed 2 times, but got %d", fexec.CommandCalls)
}
// First clear conntrack entries for the clusterIP when the
// endpoint is first added.
expectCommand := fmt.Sprintf("conntrack -D --orig-dst %s -p udp", tc.svcIP)
if isIPv6 {
expectCommand += " -f ipv6"
}
actualCommand := strings.Join(fcmd.CombinedOutputLog[0], " ")
if actualCommand != expectCommand {
t.Errorf("Expected command: %s, but executed %s", expectCommand, actualCommand)
}
// Then clear conntrack entries for the endpoint when it is
// deleted.
expectCommand = fmt.Sprintf("conntrack -D --orig-dst %s --dst-nat %s -p udp", tc.svcIP, endpointIP)
if isIPv6 {
expectCommand += " -f ipv6"
}
actualCommand = strings.Join(fcmd.CombinedOutputLog[1], " ")
if actualCommand != expectCommand {
t.Errorf("Expected command: %s, but executed %s", expectCommand, actualCommand)
}
} else if fexec.CommandCalls != 0 {
t.Fatalf("Expected conntrack to be executed 0 times, but got %d", fexec.CommandCalls)
}
// Check the number of new glog errors
var expGlogErrs int64
if tc.simulatedErr != "" && tc.simulatedErr != conntrack.NoConnectionToDelete {
expGlogErrs = 1
}
glogErrs := klog.Stats.Error.Lines() - priorGlogErrs
if glogErrs != expGlogErrs {
t.Errorf("Expected %d glogged errors, but got %d", expGlogErrs, glogErrs)
}
})
} }
} }
@ -384,10 +289,12 @@ func NewFakeProxier(ipt utiliptables.Interface) *Proxier {
// TODO: Call NewProxier after refactoring out the goroutine // TODO: Call NewProxier after refactoring out the goroutine
// invocation into a Run() method. // invocation into a Run() method.
ipfamily := v1.IPv4Protocol ipfamily := v1.IPv4Protocol
podCIDR := "10.0.0.0/8"
if ipt.IsIPv6() { if ipt.IsIPv6() {
ipfamily = v1.IPv6Protocol ipfamily = v1.IPv6Protocol
podCIDR = "fd00::/64"
} }
detectLocal, _ := proxyutiliptables.NewDetectLocalByCIDR("10.0.0.0/8", ipt) detectLocal, _ := proxyutiliptables.NewDetectLocalByCIDR(podCIDR, ipt)
networkInterfacer := utilproxytest.NewFakeNetwork() networkInterfacer := utilproxytest.NewFakeNetwork()
itf := net.Interface{Index: 0, MTU: 0, Name: "lo", HardwareAddr: nil, Flags: 0} itf := net.Interface{Index: 0, MTU: 0, Name: "lo", HardwareAddr: nil, Flags: 0}