mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-03 09:22:44 +00:00
Remove redundant iptables/nftables conntrack cleanup tests
The iptables and nftables proxy backends had 2 unit tests (TestDeleteEndpointConnections and TestProxierDeleteNodePortStaleUDP) that were effectively testing that: - If the proxy saw various Service/EndpointSlice events this would result in specific changes to the service/endpoints trackers, AND - If the service/endpoints trackers changed in those specific ways this would result in specific UpdateServiceMapResult and UpdateEndpointsMapResult values being generated, AND - If you passed those specific UpdateServiceMapResult and UpdateEndpointsMapResult values to conntrack.CleanStaleEntries it would make specific calls to the lower-level conntrack methods, AND - If you called the lower-level conntrack methods with those specific arguments, it would result in specific executions of the conntrack binary, mixed with a specific number of klog invocations. This... is not a good unit test. We already test the change tracker behavior in other unit tests, and we already tested the Update{Service,Endpoints}MapResult behavior in the pkg/proxy unit tests, and we already tested the conntrack exec behavior in pkg/proxy/conntrack/conntrack_test.go, and we now test the CleanStaleEntries behavior in pkg/proxy/conntrack/cleanup_test.go. So there is no need to try to test the top-to-bottom behavior as a "unit test".
This commit is contained in:
parent
db12cbe2ae
commit
cdf934d5bc
@ -56,8 +56,8 @@ type execCT struct {
|
||||
|
||||
var _ Interface = &execCT{}
|
||||
|
||||
// NoConnectionToDelete is the error string returned by conntrack when no matching connections are found
|
||||
const NoConnectionToDelete = "0 flow entries have been deleted"
|
||||
// noConnectionToDelete is the error string returned by conntrack when no matching connections are found
|
||||
const noConnectionToDelete = "0 flow entries have been deleted"
|
||||
|
||||
func protoStr(proto v1.Protocol) string {
|
||||
return strings.ToLower(string(proto))
|
||||
@ -89,7 +89,7 @@ func (ct *execCT) exec(parameters ...string) error {
|
||||
func (ct *execCT) ClearEntriesForIP(ip string, protocol v1.Protocol) error {
|
||||
parameters := parametersWithFamily(utilnet.IsIPv6String(ip), "-D", "--orig-dst", ip, "-p", protoStr(protocol))
|
||||
err := ct.exec(parameters...)
|
||||
if err != nil && !strings.Contains(err.Error(), NoConnectionToDelete) {
|
||||
if err != nil && !strings.Contains(err.Error(), noConnectionToDelete) {
|
||||
// TODO: Better handling for deletion failure. When failure occur, stale udp connection may not get flushed.
|
||||
// These stale udp connection will keep black hole traffic. Making this a best effort operation for now, since it
|
||||
// is expensive to baby-sit all udp connections to kubernetes services.
|
||||
@ -105,7 +105,7 @@ func (ct *execCT) ClearEntriesForPort(port int, isIPv6 bool, protocol v1.Protoco
|
||||
}
|
||||
parameters := parametersWithFamily(isIPv6, "-D", "-p", protoStr(protocol), "--dport", strconv.Itoa(port))
|
||||
err := ct.exec(parameters...)
|
||||
if err != nil && !strings.Contains(err.Error(), NoConnectionToDelete) {
|
||||
if err != nil && !strings.Contains(err.Error(), noConnectionToDelete) {
|
||||
return fmt.Errorf("error deleting conntrack entries for UDP port: %d, error: %v", port, err)
|
||||
}
|
||||
return nil
|
||||
@ -116,7 +116,7 @@ func (ct *execCT) ClearEntriesForNAT(origin, dest string, protocol v1.Protocol)
|
||||
parameters := parametersWithFamily(utilnet.IsIPv6String(origin), "-D", "--orig-dst", origin, "--dst-nat", dest,
|
||||
"-p", protoStr(protocol))
|
||||
err := ct.exec(parameters...)
|
||||
if err != nil && !strings.Contains(err.Error(), NoConnectionToDelete) {
|
||||
if err != nil && !strings.Contains(err.Error(), noConnectionToDelete) {
|
||||
// TODO: Better handling for deletion failure. When failure occur, stale udp connection may not get flushed.
|
||||
// These stale udp connection will keep black hole traffic. Making this a best effort operation for now, since it
|
||||
// is expensive to baby sit all udp connections to kubernetes services.
|
||||
@ -132,7 +132,7 @@ func (ct *execCT) ClearEntriesForPortNAT(dest string, port int, protocol v1.Prot
|
||||
}
|
||||
parameters := parametersWithFamily(utilnet.IsIPv6String(dest), "-D", "-p", protoStr(protocol), "--dport", strconv.Itoa(port), "--dst-nat", dest)
|
||||
err := ct.exec(parameters...)
|
||||
if err != nil && !strings.Contains(err.Error(), NoConnectionToDelete) {
|
||||
if err != nil && !strings.Contains(err.Error(), noConnectionToDelete) {
|
||||
return fmt.Errorf("error deleting conntrack entries for UDP port: %d, error: %v", port, err)
|
||||
}
|
||||
return nil
|
||||
|
@ -48,7 +48,6 @@ import (
|
||||
"k8s.io/klog/v2"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
"k8s.io/kubernetes/pkg/proxy"
|
||||
"k8s.io/kubernetes/pkg/proxy/conntrack"
|
||||
"k8s.io/kubernetes/pkg/proxy/metrics"
|
||||
|
||||
"k8s.io/kubernetes/pkg/proxy/healthcheck"
|
||||
@ -58,221 +57,11 @@ import (
|
||||
"k8s.io/kubernetes/pkg/util/async"
|
||||
utiliptables "k8s.io/kubernetes/pkg/util/iptables"
|
||||
iptablestest "k8s.io/kubernetes/pkg/util/iptables/testing"
|
||||
"k8s.io/utils/exec"
|
||||
fakeexec "k8s.io/utils/exec/testing"
|
||||
netutils "k8s.io/utils/net"
|
||||
"k8s.io/utils/ptr"
|
||||
)
|
||||
|
||||
func TestDeleteEndpointConnections(t *testing.T) {
|
||||
const (
|
||||
UDP = v1.ProtocolUDP
|
||||
TCP = v1.ProtocolTCP
|
||||
SCTP = v1.ProtocolSCTP
|
||||
)
|
||||
|
||||
testCases := []struct {
|
||||
description string
|
||||
svcName string
|
||||
svcIP string
|
||||
svcPort int32
|
||||
protocol v1.Protocol
|
||||
endpoint string // IP:port endpoint
|
||||
simulatedErr string
|
||||
}{
|
||||
{
|
||||
description: "V4 UDP",
|
||||
svcName: "v4-udp",
|
||||
svcIP: "172.30.1.1",
|
||||
svcPort: 80,
|
||||
protocol: UDP,
|
||||
endpoint: "10.240.0.3:80",
|
||||
},
|
||||
{
|
||||
description: "V4 TCP",
|
||||
svcName: "v4-tcp",
|
||||
svcIP: "172.30.2.2",
|
||||
svcPort: 80,
|
||||
protocol: TCP,
|
||||
endpoint: "10.240.0.4:80",
|
||||
},
|
||||
{
|
||||
description: "V4 SCTP",
|
||||
svcName: "v4-sctp",
|
||||
svcIP: "172.30.3.3",
|
||||
svcPort: 80,
|
||||
protocol: SCTP,
|
||||
endpoint: "10.240.0.5:80",
|
||||
},
|
||||
{
|
||||
description: "V4 UDP, nothing to delete, benign error",
|
||||
svcName: "v4-udp-nothing-to-delete",
|
||||
svcIP: "172.30.4.4",
|
||||
svcPort: 80,
|
||||
protocol: UDP,
|
||||
endpoint: "10.240.0.6:80",
|
||||
simulatedErr: conntrack.NoConnectionToDelete,
|
||||
},
|
||||
{
|
||||
description: "V4 UDP, unexpected error, should be glogged",
|
||||
svcName: "v4-udp-simulated-error",
|
||||
svcIP: "172.30.5.5",
|
||||
svcPort: 80,
|
||||
protocol: UDP,
|
||||
endpoint: "10.240.0.7:80",
|
||||
simulatedErr: "simulated error",
|
||||
},
|
||||
{
|
||||
description: "V6 UDP",
|
||||
svcName: "v6-udp",
|
||||
svcIP: "fd00:1234::20",
|
||||
svcPort: 80,
|
||||
protocol: UDP,
|
||||
endpoint: "[2001:db8::2]:80",
|
||||
},
|
||||
{
|
||||
description: "V6 TCP",
|
||||
svcName: "v6-tcp",
|
||||
svcIP: "fd00:1234::30",
|
||||
svcPort: 80,
|
||||
protocol: TCP,
|
||||
endpoint: "[2001:db8::3]:80",
|
||||
},
|
||||
{
|
||||
description: "V6 SCTP",
|
||||
svcName: "v6-sctp",
|
||||
svcIP: "fd00:1234::40",
|
||||
svcPort: 80,
|
||||
protocol: SCTP,
|
||||
endpoint: "[2001:db8::4]:80",
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.description, func(t *testing.T) {
|
||||
priorGlogErrs := klog.Stats.Error.Lines()
|
||||
|
||||
// Create a fake executor for the conntrack utility.
|
||||
fcmd := fakeexec.FakeCmd{}
|
||||
fexec := &fakeexec.FakeExec{
|
||||
LookPathFunc: func(cmd string) (string, error) { return cmd, nil },
|
||||
}
|
||||
execFunc := func(cmd string, args ...string) exec.Cmd {
|
||||
return fakeexec.InitFakeCmd(&fcmd, cmd, args...)
|
||||
}
|
||||
|
||||
if tc.protocol == UDP {
|
||||
cmdOutput := "1 flow entries have been deleted"
|
||||
var simErr error
|
||||
|
||||
// First call outputs cmdOutput and succeeds
|
||||
fcmd.CombinedOutputScript = append(fcmd.CombinedOutputScript,
|
||||
func() ([]byte, []byte, error) { return []byte(cmdOutput), nil, nil },
|
||||
)
|
||||
fexec.CommandScript = append(fexec.CommandScript, execFunc)
|
||||
|
||||
// Second call may succeed or fail
|
||||
if tc.simulatedErr != "" {
|
||||
cmdOutput = ""
|
||||
simErr = fmt.Errorf(tc.simulatedErr)
|
||||
}
|
||||
fcmd.CombinedOutputScript = append(fcmd.CombinedOutputScript,
|
||||
func() ([]byte, []byte, error) { return []byte(cmdOutput), nil, simErr },
|
||||
)
|
||||
fexec.CommandScript = append(fexec.CommandScript, execFunc)
|
||||
}
|
||||
|
||||
endpointIP := proxyutil.IPPart(tc.endpoint)
|
||||
isIPv6 := netutils.IsIPv6String(endpointIP)
|
||||
|
||||
var ipt utiliptables.Interface
|
||||
if isIPv6 {
|
||||
ipt = iptablestest.NewIPv6Fake()
|
||||
} else {
|
||||
ipt = iptablestest.NewFake()
|
||||
}
|
||||
fp := NewFakeProxier(ipt)
|
||||
fp.exec = fexec
|
||||
|
||||
makeServiceMap(fp,
|
||||
makeTestService("ns1", tc.svcName, func(svc *v1.Service) {
|
||||
svc.Spec.ClusterIP = tc.svcIP
|
||||
svc.Spec.Ports = []v1.ServicePort{{
|
||||
Name: "p80",
|
||||
Port: tc.svcPort,
|
||||
Protocol: tc.protocol,
|
||||
}}
|
||||
svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyLocal
|
||||
}),
|
||||
)
|
||||
fp.svcPortMap.Update(fp.serviceChanges)
|
||||
|
||||
slice := makeTestEndpointSlice("ns1", tc.svcName, 1, func(eps *discovery.EndpointSlice) {
|
||||
if isIPv6 {
|
||||
eps.AddressType = discovery.AddressTypeIPv6
|
||||
} else {
|
||||
eps.AddressType = discovery.AddressTypeIPv4
|
||||
}
|
||||
eps.Endpoints = []discovery.Endpoint{{
|
||||
Addresses: []string{endpointIP},
|
||||
}}
|
||||
eps.Ports = []discovery.EndpointPort{{
|
||||
Name: ptr.To("p80"),
|
||||
Port: ptr.To[int32](80),
|
||||
Protocol: ptr.To(tc.protocol),
|
||||
}}
|
||||
})
|
||||
|
||||
// Add and then remove the endpoint slice
|
||||
fp.OnEndpointSliceAdd(slice)
|
||||
fp.syncProxyRules()
|
||||
fp.OnEndpointSliceDelete(slice)
|
||||
fp.syncProxyRules()
|
||||
|
||||
// Check the executed conntrack command
|
||||
if tc.protocol == UDP {
|
||||
if fexec.CommandCalls != 2 {
|
||||
t.Fatalf("Expected conntrack to be executed 2 times, but got %d", fexec.CommandCalls)
|
||||
}
|
||||
|
||||
// First clear conntrack entries for the clusterIP when the
|
||||
// endpoint is first added.
|
||||
expectCommand := fmt.Sprintf("conntrack -D --orig-dst %s -p udp", tc.svcIP)
|
||||
if isIPv6 {
|
||||
expectCommand += " -f ipv6"
|
||||
}
|
||||
actualCommand := strings.Join(fcmd.CombinedOutputLog[0], " ")
|
||||
if actualCommand != expectCommand {
|
||||
t.Errorf("Expected command: %s, but executed %s", expectCommand, actualCommand)
|
||||
}
|
||||
|
||||
// Then clear conntrack entries for the endpoint when it is
|
||||
// deleted.
|
||||
expectCommand = fmt.Sprintf("conntrack -D --orig-dst %s --dst-nat %s -p udp", tc.svcIP, endpointIP)
|
||||
if isIPv6 {
|
||||
expectCommand += " -f ipv6"
|
||||
}
|
||||
actualCommand = strings.Join(fcmd.CombinedOutputLog[1], " ")
|
||||
if actualCommand != expectCommand {
|
||||
t.Errorf("Expected command: %s, but executed %s", expectCommand, actualCommand)
|
||||
}
|
||||
} else if fexec.CommandCalls != 0 {
|
||||
t.Fatalf("Expected conntrack to be executed 0 times, but got %d", fexec.CommandCalls)
|
||||
}
|
||||
|
||||
// Check the number of new glog errors
|
||||
var expGlogErrs int64
|
||||
if tc.simulatedErr != "" && tc.simulatedErr != conntrack.NoConnectionToDelete {
|
||||
expGlogErrs = 1
|
||||
}
|
||||
glogErrs := klog.Stats.Error.Lines() - priorGlogErrs
|
||||
if glogErrs != expGlogErrs {
|
||||
t.Errorf("Expected %d glogged errors, but got %d", expGlogErrs, glogErrs)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// Conventions for tests using NewFakeProxier:
|
||||
//
|
||||
// Pod IPs: 10.0.0.0/8
|
||||
@ -4337,139 +4126,6 @@ func TestHealthCheckNodePortWhenTerminating(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestProxierDeleteNodePortStaleUDP(t *testing.T) {
|
||||
fcmd := fakeexec.FakeCmd{}
|
||||
fexec := &fakeexec.FakeExec{
|
||||
LookPathFunc: func(cmd string) (string, error) { return cmd, nil },
|
||||
}
|
||||
execFunc := func(cmd string, args ...string) exec.Cmd {
|
||||
return fakeexec.InitFakeCmd(&fcmd, cmd, args...)
|
||||
}
|
||||
cmdOutput := "1 flow entries have been deleted"
|
||||
cmdFunc := func() ([]byte, []byte, error) { return []byte(cmdOutput), nil, nil }
|
||||
|
||||
// Delete ClusterIP entries
|
||||
fcmd.CombinedOutputScript = append(fcmd.CombinedOutputScript, cmdFunc)
|
||||
fexec.CommandScript = append(fexec.CommandScript, execFunc)
|
||||
// Delete ExternalIP entries
|
||||
fcmd.CombinedOutputScript = append(fcmd.CombinedOutputScript, cmdFunc)
|
||||
fexec.CommandScript = append(fexec.CommandScript, execFunc)
|
||||
// Delete LoadBalancerIP entries
|
||||
fcmd.CombinedOutputScript = append(fcmd.CombinedOutputScript, cmdFunc)
|
||||
fexec.CommandScript = append(fexec.CommandScript, execFunc)
|
||||
// Delete NodePort entries
|
||||
fcmd.CombinedOutputScript = append(fcmd.CombinedOutputScript, cmdFunc)
|
||||
fexec.CommandScript = append(fexec.CommandScript, execFunc)
|
||||
|
||||
ipt := iptablestest.NewFake()
|
||||
fp := NewFakeProxier(ipt)
|
||||
fp.exec = fexec
|
||||
|
||||
svcIP := "172.30.0.41"
|
||||
extIP := "192.168.99.11"
|
||||
lbIngressIP := "1.2.3.4"
|
||||
svcPort := 80
|
||||
nodePort := 31201
|
||||
svcPortName := proxy.ServicePortName{
|
||||
NamespacedName: makeNSN("ns1", "svc1"),
|
||||
Port: "p80",
|
||||
Protocol: v1.ProtocolUDP,
|
||||
}
|
||||
|
||||
makeServiceMap(fp,
|
||||
makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) {
|
||||
svc.Spec.ClusterIP = svcIP
|
||||
svc.Spec.ExternalIPs = []string{extIP}
|
||||
svc.Spec.Type = "LoadBalancer"
|
||||
svc.Spec.Ports = []v1.ServicePort{{
|
||||
Name: svcPortName.Port,
|
||||
Port: int32(svcPort),
|
||||
Protocol: v1.ProtocolUDP,
|
||||
NodePort: int32(nodePort),
|
||||
}}
|
||||
svc.Status.LoadBalancer.Ingress = []v1.LoadBalancerIngress{{
|
||||
IP: lbIngressIP,
|
||||
}}
|
||||
}),
|
||||
)
|
||||
|
||||
fp.syncProxyRules()
|
||||
if fexec.CommandCalls != 0 {
|
||||
t.Fatalf("Created service without endpoints must not clear conntrack entries")
|
||||
}
|
||||
|
||||
epIP := "10.180.0.1"
|
||||
populateEndpointSlices(fp,
|
||||
makeTestEndpointSlice(svcPortName.Namespace, svcPortName.Name, 1, func(eps *discovery.EndpointSlice) {
|
||||
eps.AddressType = discovery.AddressTypeIPv4
|
||||
eps.Endpoints = []discovery.Endpoint{{
|
||||
Addresses: []string{epIP},
|
||||
Conditions: discovery.EndpointConditions{
|
||||
Serving: ptr.To(false),
|
||||
},
|
||||
}}
|
||||
eps.Ports = []discovery.EndpointPort{{
|
||||
Name: ptr.To(svcPortName.Port),
|
||||
Port: ptr.To(int32(svcPort)),
|
||||
Protocol: ptr.To(v1.ProtocolUDP),
|
||||
}}
|
||||
}),
|
||||
)
|
||||
|
||||
fp.syncProxyRules()
|
||||
|
||||
if fexec.CommandCalls != 0 {
|
||||
t.Fatalf("Updated UDP service with not ready endpoints must not clear UDP entries")
|
||||
}
|
||||
|
||||
populateEndpointSlices(fp,
|
||||
makeTestEndpointSlice(svcPortName.Namespace, svcPortName.Name, 1, func(eps *discovery.EndpointSlice) {
|
||||
eps.AddressType = discovery.AddressTypeIPv4
|
||||
eps.Endpoints = []discovery.Endpoint{{
|
||||
Addresses: []string{epIP},
|
||||
Conditions: discovery.EndpointConditions{
|
||||
Serving: ptr.To(true),
|
||||
},
|
||||
}}
|
||||
eps.Ports = []discovery.EndpointPort{{
|
||||
Name: ptr.To(svcPortName.Port),
|
||||
Port: ptr.To(int32(svcPort)),
|
||||
Protocol: ptr.To(v1.ProtocolUDP),
|
||||
}}
|
||||
}),
|
||||
)
|
||||
|
||||
fp.syncProxyRules()
|
||||
|
||||
if fexec.CommandCalls != 4 {
|
||||
t.Fatalf("Updated UDP service with new endpoints must clear UDP entries 4 times: ClusterIP, NodePort, ExternalIP and LB")
|
||||
}
|
||||
|
||||
// the order is not guaranteed so we have to compare the strings in any order
|
||||
expectedCommands := []string{
|
||||
// Delete ClusterIP Conntrack entries
|
||||
fmt.Sprintf("conntrack -D --orig-dst %s -p %s", svcIP, strings.ToLower(string((v1.ProtocolUDP)))),
|
||||
// Delete ExternalIP Conntrack entries
|
||||
fmt.Sprintf("conntrack -D --orig-dst %s -p %s", extIP, strings.ToLower(string((v1.ProtocolUDP)))),
|
||||
// Delete LoadBalancerIP Conntrack entries
|
||||
fmt.Sprintf("conntrack -D --orig-dst %s -p %s", lbIngressIP, strings.ToLower(string((v1.ProtocolUDP)))),
|
||||
// Delete NodePort Conntrack entrie
|
||||
fmt.Sprintf("conntrack -D -p %s --dport %d", strings.ToLower(string((v1.ProtocolUDP))), nodePort),
|
||||
}
|
||||
actualCommands := []string{
|
||||
strings.Join(fcmd.CombinedOutputLog[0], " "),
|
||||
strings.Join(fcmd.CombinedOutputLog[1], " "),
|
||||
strings.Join(fcmd.CombinedOutputLog[2], " "),
|
||||
strings.Join(fcmd.CombinedOutputLog[3], " "),
|
||||
}
|
||||
sort.Strings(expectedCommands)
|
||||
sort.Strings(actualCommands)
|
||||
|
||||
if !reflect.DeepEqual(expectedCommands, actualCommands) {
|
||||
t.Errorf("Expected commands: %v, but executed %v", expectedCommands, actualCommands)
|
||||
}
|
||||
}
|
||||
|
||||
func TestProxierMetricsIptablesTotalRules(t *testing.T) {
|
||||
ipt := iptablestest.NewFake()
|
||||
fp := NewFakeProxier(ipt)
|
||||
|
@ -23,8 +23,6 @@ import (
|
||||
"fmt"
|
||||
"net"
|
||||
"reflect"
|
||||
"sort"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@ -39,10 +37,8 @@ import (
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
featuregatetesting "k8s.io/component-base/featuregate/testing"
|
||||
"k8s.io/component-base/metrics/testutil"
|
||||
"k8s.io/klog/v2"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
"k8s.io/kubernetes/pkg/proxy"
|
||||
"k8s.io/kubernetes/pkg/proxy/conntrack"
|
||||
"k8s.io/kubernetes/pkg/proxy/metrics"
|
||||
|
||||
"k8s.io/kubernetes/pkg/proxy/healthcheck"
|
||||
@ -50,213 +46,11 @@ import (
|
||||
proxyutiliptables "k8s.io/kubernetes/pkg/proxy/util/iptables"
|
||||
proxyutiltest "k8s.io/kubernetes/pkg/proxy/util/testing"
|
||||
"k8s.io/kubernetes/pkg/util/async"
|
||||
"k8s.io/utils/exec"
|
||||
fakeexec "k8s.io/utils/exec/testing"
|
||||
netutils "k8s.io/utils/net"
|
||||
"k8s.io/utils/ptr"
|
||||
)
|
||||
|
||||
func TestDeleteEndpointConnections(t *testing.T) {
|
||||
const (
|
||||
UDP = v1.ProtocolUDP
|
||||
TCP = v1.ProtocolTCP
|
||||
SCTP = v1.ProtocolSCTP
|
||||
)
|
||||
|
||||
testCases := []struct {
|
||||
description string
|
||||
svcName string
|
||||
svcIP string
|
||||
svcPort int32
|
||||
protocol v1.Protocol
|
||||
endpoint string // IP:port endpoint
|
||||
simulatedErr string
|
||||
}{
|
||||
{
|
||||
description: "V4 UDP",
|
||||
svcName: "v4-udp",
|
||||
svcIP: "172.30.1.1",
|
||||
svcPort: 80,
|
||||
protocol: UDP,
|
||||
endpoint: "10.240.0.3:80",
|
||||
},
|
||||
{
|
||||
description: "V4 TCP",
|
||||
svcName: "v4-tcp",
|
||||
svcIP: "172.30.2.2",
|
||||
svcPort: 80,
|
||||
protocol: TCP,
|
||||
endpoint: "10.240.0.4:80",
|
||||
},
|
||||
{
|
||||
description: "V4 SCTP",
|
||||
svcName: "v4-sctp",
|
||||
svcIP: "172.30.3.3",
|
||||
svcPort: 80,
|
||||
protocol: SCTP,
|
||||
endpoint: "10.240.0.5:80",
|
||||
},
|
||||
{
|
||||
description: "V4 UDP, nothing to delete, benign error",
|
||||
svcName: "v4-udp-nothing-to-delete",
|
||||
svcIP: "172.30.4.4",
|
||||
svcPort: 80,
|
||||
protocol: UDP,
|
||||
endpoint: "10.240.0.6:80",
|
||||
simulatedErr: conntrack.NoConnectionToDelete,
|
||||
},
|
||||
{
|
||||
description: "V4 UDP, unexpected error, should be glogged",
|
||||
svcName: "v4-udp-simulated-error",
|
||||
svcIP: "172.30.5.5",
|
||||
svcPort: 80,
|
||||
protocol: UDP,
|
||||
endpoint: "10.240.0.7:80",
|
||||
simulatedErr: "simulated error",
|
||||
},
|
||||
{
|
||||
description: "V6 UDP",
|
||||
svcName: "v6-udp",
|
||||
svcIP: "fd00:1234::20",
|
||||
svcPort: 80,
|
||||
protocol: UDP,
|
||||
endpoint: "[2001:db8::2]:80",
|
||||
},
|
||||
{
|
||||
description: "V6 TCP",
|
||||
svcName: "v6-tcp",
|
||||
svcIP: "fd00:1234::30",
|
||||
svcPort: 80,
|
||||
protocol: TCP,
|
||||
endpoint: "[2001:db8::3]:80",
|
||||
},
|
||||
{
|
||||
description: "V6 SCTP",
|
||||
svcName: "v6-sctp",
|
||||
svcIP: "fd00:1234::40",
|
||||
svcPort: 80,
|
||||
protocol: SCTP,
|
||||
endpoint: "[2001:db8::4]:80",
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.description, func(t *testing.T) {
|
||||
priorGlogErrs := klog.Stats.Error.Lines()
|
||||
|
||||
// Create a fake executor for the conntrack utility.
|
||||
fcmd := fakeexec.FakeCmd{}
|
||||
fexec := &fakeexec.FakeExec{
|
||||
LookPathFunc: func(cmd string) (string, error) { return cmd, nil },
|
||||
}
|
||||
execFunc := func(cmd string, args ...string) exec.Cmd {
|
||||
return fakeexec.InitFakeCmd(&fcmd, cmd, args...)
|
||||
}
|
||||
|
||||
if tc.protocol == UDP {
|
||||
cmdOutput := "1 flow entries have been deleted"
|
||||
var simErr error
|
||||
|
||||
// First call outputs cmdOutput and succeeds
|
||||
fcmd.CombinedOutputScript = append(fcmd.CombinedOutputScript,
|
||||
func() ([]byte, []byte, error) { return []byte(cmdOutput), nil, nil },
|
||||
)
|
||||
fexec.CommandScript = append(fexec.CommandScript, execFunc)
|
||||
|
||||
// Second call may succeed or fail
|
||||
if tc.simulatedErr != "" {
|
||||
cmdOutput = ""
|
||||
simErr = fmt.Errorf(tc.simulatedErr)
|
||||
}
|
||||
fcmd.CombinedOutputScript = append(fcmd.CombinedOutputScript,
|
||||
func() ([]byte, []byte, error) { return []byte(cmdOutput), nil, simErr },
|
||||
)
|
||||
fexec.CommandScript = append(fexec.CommandScript, execFunc)
|
||||
}
|
||||
|
||||
endpointIP := proxyutil.IPPart(tc.endpoint)
|
||||
_, fp := NewFakeProxier(proxyutil.GetIPFamilyFromIP(netutils.ParseIPSloppy(endpointIP)))
|
||||
fp.exec = fexec
|
||||
|
||||
makeServiceMap(fp,
|
||||
makeTestService("ns1", tc.svcName, func(svc *v1.Service) {
|
||||
svc.Spec.ClusterIP = tc.svcIP
|
||||
svc.Spec.Ports = []v1.ServicePort{{
|
||||
Name: "p80",
|
||||
Port: tc.svcPort,
|
||||
Protocol: tc.protocol,
|
||||
}}
|
||||
svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyLocal
|
||||
}),
|
||||
)
|
||||
fp.svcPortMap.Update(fp.serviceChanges)
|
||||
|
||||
slice := makeTestEndpointSlice("ns1", tc.svcName, 1, func(eps *discovery.EndpointSlice) {
|
||||
if fp.ipFamily == v1.IPv6Protocol {
|
||||
eps.AddressType = discovery.AddressTypeIPv6
|
||||
} else {
|
||||
eps.AddressType = discovery.AddressTypeIPv4
|
||||
}
|
||||
eps.Endpoints = []discovery.Endpoint{{
|
||||
Addresses: []string{endpointIP},
|
||||
}}
|
||||
eps.Ports = []discovery.EndpointPort{{
|
||||
Name: ptr.To("p80"),
|
||||
Port: ptr.To[int32](80),
|
||||
Protocol: ptr.To(tc.protocol),
|
||||
}}
|
||||
})
|
||||
|
||||
// Add and then remove the endpoint slice
|
||||
fp.OnEndpointSliceAdd(slice)
|
||||
fp.syncProxyRules()
|
||||
fp.OnEndpointSliceDelete(slice)
|
||||
fp.syncProxyRules()
|
||||
|
||||
// Check the executed conntrack command
|
||||
if tc.protocol == UDP {
|
||||
if fexec.CommandCalls != 2 {
|
||||
t.Fatalf("Expected conntrack to be executed 2 times, but got %d", fexec.CommandCalls)
|
||||
}
|
||||
|
||||
// First clear conntrack entries for the clusterIP when the
|
||||
// endpoint is first added.
|
||||
expectCommand := fmt.Sprintf("conntrack -D --orig-dst %s -p udp", tc.svcIP)
|
||||
if fp.ipFamily == v1.IPv6Protocol {
|
||||
expectCommand += " -f ipv6"
|
||||
}
|
||||
actualCommand := strings.Join(fcmd.CombinedOutputLog[0], " ")
|
||||
if actualCommand != expectCommand {
|
||||
t.Errorf("Expected command: %s, but executed %s", expectCommand, actualCommand)
|
||||
}
|
||||
|
||||
// Then clear conntrack entries for the endpoint when it is
|
||||
// deleted.
|
||||
expectCommand = fmt.Sprintf("conntrack -D --orig-dst %s --dst-nat %s -p udp", tc.svcIP, endpointIP)
|
||||
if fp.ipFamily == v1.IPv6Protocol {
|
||||
expectCommand += " -f ipv6"
|
||||
}
|
||||
actualCommand = strings.Join(fcmd.CombinedOutputLog[1], " ")
|
||||
if actualCommand != expectCommand {
|
||||
t.Errorf("Expected command: %s, but executed %s", expectCommand, actualCommand)
|
||||
}
|
||||
} else if fexec.CommandCalls != 0 {
|
||||
t.Fatalf("Expected conntrack to be executed 0 times, but got %d", fexec.CommandCalls)
|
||||
}
|
||||
|
||||
// Check the number of new glog errors
|
||||
var expGlogErrs int64
|
||||
if tc.simulatedErr != "" && tc.simulatedErr != conntrack.NoConnectionToDelete {
|
||||
expGlogErrs = 1
|
||||
}
|
||||
glogErrs := klog.Stats.Error.Lines() - priorGlogErrs
|
||||
if glogErrs != expGlogErrs {
|
||||
t.Errorf("Expected %d glogged errors, but got %d", expGlogErrs, glogErrs)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// Conventions for tests using NewFakeProxier:
|
||||
//
|
||||
// Pod IPs: 10.0.0.0/8
|
||||
@ -2737,138 +2531,6 @@ func TestHealthCheckNodePortWhenTerminating(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestProxierDeleteNodePortStaleUDP(t *testing.T) {
|
||||
fcmd := fakeexec.FakeCmd{}
|
||||
fexec := &fakeexec.FakeExec{
|
||||
LookPathFunc: func(cmd string) (string, error) { return cmd, nil },
|
||||
}
|
||||
execFunc := func(cmd string, args ...string) exec.Cmd {
|
||||
return fakeexec.InitFakeCmd(&fcmd, cmd, args...)
|
||||
}
|
||||
cmdOutput := "1 flow entries have been deleted"
|
||||
cmdFunc := func() ([]byte, []byte, error) { return []byte(cmdOutput), nil, nil }
|
||||
|
||||
// Delete ClusterIP entries
|
||||
fcmd.CombinedOutputScript = append(fcmd.CombinedOutputScript, cmdFunc)
|
||||
fexec.CommandScript = append(fexec.CommandScript, execFunc)
|
||||
// Delete ExternalIP entries
|
||||
fcmd.CombinedOutputScript = append(fcmd.CombinedOutputScript, cmdFunc)
|
||||
fexec.CommandScript = append(fexec.CommandScript, execFunc)
|
||||
// Delete LoadBalancerIP entries
|
||||
fcmd.CombinedOutputScript = append(fcmd.CombinedOutputScript, cmdFunc)
|
||||
fexec.CommandScript = append(fexec.CommandScript, execFunc)
|
||||
// Delete NodePort entries
|
||||
fcmd.CombinedOutputScript = append(fcmd.CombinedOutputScript, cmdFunc)
|
||||
fexec.CommandScript = append(fexec.CommandScript, execFunc)
|
||||
|
||||
_, fp := NewFakeProxier(v1.IPv4Protocol)
|
||||
fp.exec = fexec
|
||||
|
||||
svcIP := "172.30.0.41"
|
||||
extIP := "192.168.99.11"
|
||||
lbIngressIP := "1.2.3.4"
|
||||
svcPort := 80
|
||||
nodePort := 31201
|
||||
svcPortName := proxy.ServicePortName{
|
||||
NamespacedName: makeNSN("ns1", "svc1"),
|
||||
Port: "p80",
|
||||
Protocol: v1.ProtocolUDP,
|
||||
}
|
||||
|
||||
makeServiceMap(fp,
|
||||
makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) {
|
||||
svc.Spec.ClusterIP = svcIP
|
||||
svc.Spec.ExternalIPs = []string{extIP}
|
||||
svc.Spec.Type = "LoadBalancer"
|
||||
svc.Spec.Ports = []v1.ServicePort{{
|
||||
Name: svcPortName.Port,
|
||||
Port: int32(svcPort),
|
||||
Protocol: v1.ProtocolUDP,
|
||||
NodePort: int32(nodePort),
|
||||
}}
|
||||
svc.Status.LoadBalancer.Ingress = []v1.LoadBalancerIngress{{
|
||||
IP: lbIngressIP,
|
||||
}}
|
||||
}),
|
||||
)
|
||||
|
||||
fp.syncProxyRules()
|
||||
if fexec.CommandCalls != 0 {
|
||||
t.Fatalf("Created service without endpoints must not clear conntrack entries")
|
||||
}
|
||||
|
||||
epIP := "10.180.0.1"
|
||||
populateEndpointSlices(fp,
|
||||
makeTestEndpointSlice(svcPortName.Namespace, svcPortName.Name, 1, func(eps *discovery.EndpointSlice) {
|
||||
eps.AddressType = discovery.AddressTypeIPv4
|
||||
eps.Endpoints = []discovery.Endpoint{{
|
||||
Addresses: []string{epIP},
|
||||
Conditions: discovery.EndpointConditions{
|
||||
Serving: ptr.To(false),
|
||||
},
|
||||
}}
|
||||
eps.Ports = []discovery.EndpointPort{{
|
||||
Name: ptr.To(svcPortName.Port),
|
||||
Port: ptr.To(int32(svcPort)),
|
||||
Protocol: ptr.To(v1.ProtocolUDP),
|
||||
}}
|
||||
}),
|
||||
)
|
||||
|
||||
fp.syncProxyRules()
|
||||
|
||||
if fexec.CommandCalls != 0 {
|
||||
t.Fatalf("Updated UDP service with not ready endpoints must not clear UDP entries")
|
||||
}
|
||||
|
||||
populateEndpointSlices(fp,
|
||||
makeTestEndpointSlice(svcPortName.Namespace, svcPortName.Name, 1, func(eps *discovery.EndpointSlice) {
|
||||
eps.AddressType = discovery.AddressTypeIPv4
|
||||
eps.Endpoints = []discovery.Endpoint{{
|
||||
Addresses: []string{epIP},
|
||||
Conditions: discovery.EndpointConditions{
|
||||
Serving: ptr.To(true),
|
||||
},
|
||||
}}
|
||||
eps.Ports = []discovery.EndpointPort{{
|
||||
Name: ptr.To(svcPortName.Port),
|
||||
Port: ptr.To(int32(svcPort)),
|
||||
Protocol: ptr.To(v1.ProtocolUDP),
|
||||
}}
|
||||
}),
|
||||
)
|
||||
|
||||
fp.syncProxyRules()
|
||||
|
||||
if fexec.CommandCalls != 4 {
|
||||
t.Fatalf("Updated UDP service with new endpoints must clear UDP entries 4 times: ClusterIP, NodePort, ExternalIP and LB")
|
||||
}
|
||||
|
||||
// the order is not guaranteed so we have to compare the strings in any order
|
||||
expectedCommands := []string{
|
||||
// Delete ClusterIP Conntrack entries
|
||||
fmt.Sprintf("conntrack -D --orig-dst %s -p %s", svcIP, strings.ToLower(string((v1.ProtocolUDP)))),
|
||||
// Delete ExternalIP Conntrack entries
|
||||
fmt.Sprintf("conntrack -D --orig-dst %s -p %s", extIP, strings.ToLower(string((v1.ProtocolUDP)))),
|
||||
// Delete LoadBalancerIP Conntrack entries
|
||||
fmt.Sprintf("conntrack -D --orig-dst %s -p %s", lbIngressIP, strings.ToLower(string((v1.ProtocolUDP)))),
|
||||
// Delete NodePort Conntrack entrie
|
||||
fmt.Sprintf("conntrack -D -p %s --dport %d", strings.ToLower(string((v1.ProtocolUDP))), nodePort),
|
||||
}
|
||||
actualCommands := []string{
|
||||
strings.Join(fcmd.CombinedOutputLog[0], " "),
|
||||
strings.Join(fcmd.CombinedOutputLog[1], " "),
|
||||
strings.Join(fcmd.CombinedOutputLog[2], " "),
|
||||
strings.Join(fcmd.CombinedOutputLog[3], " "),
|
||||
}
|
||||
sort.Strings(expectedCommands)
|
||||
sort.Strings(actualCommands)
|
||||
|
||||
if !reflect.DeepEqual(expectedCommands, actualCommands) {
|
||||
t.Errorf("Expected commands: %v, but executed %v", expectedCommands, actualCommands)
|
||||
}
|
||||
}
|
||||
|
||||
// TODO(thockin): add *more* tests for syncProxyRules() or break it down further and test the pieces.
|
||||
|
||||
// This test ensures that the iptables proxier supports translating Endpoints to
|
||||
|
Loading…
Reference in New Issue
Block a user