Plumb the conntrack.Interface up to the proxiers

And use the fake interface in the unit tests, removing the dependency
on setting up FakeExec stuff when conntrack cleanup will be invoked.

Also, remove the isIPv6 argument to CleanStaleEntries, because it can
be inferred from the other args.
This commit is contained in:
Dan Winship 2023-12-22 17:24:39 -05:00
parent cdf934d5bc
commit fcb51554a1
9 changed files with 80 additions and 64 deletions

View File

@ -25,19 +25,13 @@ import (
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/proxy"
proxyutil "k8s.io/kubernetes/pkg/proxy/util"
utilexec "k8s.io/utils/exec"
netutils "k8s.io/utils/net"
)
// CleanStaleEntries takes care of flushing stale conntrack entries for services and endpoints.
func CleanStaleEntries(isIPv6 bool, exec utilexec.Interface, svcPortMap proxy.ServicePortMap,
func CleanStaleEntries(ct Interface, svcPortMap proxy.ServicePortMap,
serviceUpdateResult proxy.UpdateServiceMapResult, endpointsUpdateResult proxy.UpdateEndpointsMapResult) {
ct := &execCT{exec}
cleanStaleEntries(ct, isIPv6, svcPortMap, serviceUpdateResult, endpointsUpdateResult)
}
func cleanStaleEntries(ct Interface, isIPv6 bool, svcPortMap proxy.ServicePortMap,
serviceUpdateResult proxy.UpdateServiceMapResult, endpointsUpdateResult proxy.UpdateEndpointsMapResult) {
deleteStaleServiceConntrackEntries(ct, isIPv6, svcPortMap, serviceUpdateResult, endpointsUpdateResult)
deleteStaleServiceConntrackEntries(ct, svcPortMap, serviceUpdateResult, endpointsUpdateResult)
deleteStaleEndpointConntrackEntries(ct, svcPortMap, endpointsUpdateResult)
}
@ -45,9 +39,10 @@ func cleanStaleEntries(ct Interface, isIPv6 bool, svcPortMap proxy.ServicePortMa
// to UDP Service IPs. When a service has no endpoints and we drop traffic to it, conntrack
// may create "black hole" entries for that IP+port. When the service gets endpoints we
// need to delete those entries so further traffic doesn't get dropped.
func deleteStaleServiceConntrackEntries(ct Interface, isIPv6 bool, svcPortMap proxy.ServicePortMap, serviceUpdateResult proxy.UpdateServiceMapResult, endpointsUpdateResult proxy.UpdateEndpointsMapResult) {
func deleteStaleServiceConntrackEntries(ct Interface, svcPortMap proxy.ServicePortMap, serviceUpdateResult proxy.UpdateServiceMapResult, endpointsUpdateResult proxy.UpdateEndpointsMapResult) {
conntrackCleanupServiceIPs := serviceUpdateResult.DeletedUDPClusterIPs
conntrackCleanupServiceNodePorts := sets.New[int]()
isIPv6 := false
// merge newly active services gathered from endpointsUpdateResult
// a UDP service that changes from 0 to non-0 endpoints is newly active.
@ -64,6 +59,7 @@ func deleteStaleServiceConntrackEntries(ct Interface, isIPv6 bool, svcPortMap pr
nodePort := svcInfo.NodePort()
if svcInfo.Protocol() == v1.ProtocolUDP && nodePort != 0 {
conntrackCleanupServiceNodePorts.Insert(nodePort)
isIPv6 = netutils.IsIPv6(svcInfo.ClusterIP())
}
}
}

View File

@ -245,7 +245,7 @@ func TestCleanStaleEntries(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.description, func(t *testing.T) {
fake := NewFake()
cleanStaleEntries(fake, false, svcPortMap, tc.serviceUpdates, tc.endpointsUpdates)
CleanStaleEntries(fake, svcPortMap, tc.serviceUpdates, tc.endpointsUpdates)
if !fake.ClearedIPs.Equal(tc.result.ClearedIPs) {
t.Errorf("Expected ClearedIPs=%v, got %v", tc.result.ClearedIPs, fake.ClearedIPs)
}

View File

@ -56,6 +56,10 @@ type execCT struct {
var _ Interface = &execCT{}
func NewExec(execer exec.Interface) Interface {
return &execCT{execer: execer}
}
// noConnectionToDelete is the error string returned by conntrack when no matching connections are found
const noConnectionToDelete = "0 flow entries have been deleted"

View File

@ -170,7 +170,7 @@ type Proxier struct {
iptables utiliptables.Interface
masqueradeAll bool
masqueradeMark string
exec utilexec.Interface
conntrack conntrack.Interface
localDetector proxyutiliptables.LocalTrafficDetector
hostname string
nodeIP net.IP
@ -283,7 +283,7 @@ func NewProxier(ipFamily v1.IPFamily,
iptables: ipt,
masqueradeAll: masqueradeAll,
masqueradeMark: masqueradeMark,
exec: exec,
conntrack: conntrack.NewExec(exec),
localDetector: localDetector,
hostname: hostname,
nodeIP: nodeIP,
@ -1538,7 +1538,7 @@ func (proxier *Proxier) syncProxyRules() {
}
// Finish housekeeping, clear stale conntrack entries for UDP Services
conntrack.CleanStaleEntries(proxier.iptables.IsIPv6(), proxier.exec, proxier.svcPortMap, serviceUpdateResult, endpointUpdateResult)
conntrack.CleanStaleEntries(proxier.conntrack, proxier.svcPortMap, serviceUpdateResult, endpointUpdateResult)
}
func (proxier *Proxier) writeServiceToEndpointRules(natRules proxyutil.LineBuffer, svcPortNameString string, svcInfo proxy.ServicePort, svcChain utiliptables.Chain, endpoints []proxy.Endpoint, args []string) {

View File

@ -48,6 +48,7 @@ import (
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/proxy"
"k8s.io/kubernetes/pkg/proxy/conntrack"
"k8s.io/kubernetes/pkg/proxy/metrics"
"k8s.io/kubernetes/pkg/proxy/healthcheck"
@ -57,7 +58,6 @@ import (
"k8s.io/kubernetes/pkg/util/async"
utiliptables "k8s.io/kubernetes/pkg/util/iptables"
iptablestest "k8s.io/kubernetes/pkg/util/iptables/testing"
fakeexec "k8s.io/utils/exec/testing"
netutils "k8s.io/utils/net"
"k8s.io/utils/ptr"
)
@ -113,7 +113,6 @@ func NewFakeProxier(ipt utiliptables.Interface) *Proxier {
networkInterfacer.AddInterfaceAddr(&itf1, addrs1)
p := &Proxier{
exec: &fakeexec.FakeExec{},
svcPortMap: make(proxy.ServicePortMap),
serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, ipfamily, nil, nil),
endpointsMap: make(proxy.EndpointsMap),
@ -121,6 +120,7 @@ func NewFakeProxier(ipt utiliptables.Interface) *Proxier {
needFullSync: true,
iptables: ipt,
masqueradeMark: "0x4000",
conntrack: conntrack.NewFake(),
localDetector: detectLocal,
hostname: testHostname,
serviceHealthServer: healthcheck.NewFakeServiceHealthServer(),
@ -1915,22 +1915,23 @@ func TestClusterIPGeneral(t *testing.T) {
TargetPort: intstr.FromInt32(8443),
},
{
// Of course this should really be UDP, but if we
// create a service with UDP ports, the Proxier will
// try to do conntrack cleanup and we'd have to set
// the FakeExec up to be able to deal with that...
Name: "dns-sctp",
Name: "dns-udp",
Port: 53,
Protocol: v1.ProtocolSCTP,
Protocol: v1.ProtocolUDP,
},
{
Name: "dns-tcp",
Port: 53,
Protocol: v1.ProtocolTCP,
// We use TargetPort on TCP but not SCTP to help
// disambiguate the output.
// We use TargetPort on TCP but not UDP/SCTP to
// help disambiguate the output.
TargetPort: intstr.FromInt32(5353),
},
{
Name: "dns-sctp",
Port: 53,
Protocol: v1.ProtocolSCTP,
},
}
}),
)
@ -1972,15 +1973,20 @@ func TestClusterIPGeneral(t *testing.T) {
Protocol: ptr.To(v1.ProtocolTCP),
},
{
Name: ptr.To("dns-sctp"),
Name: ptr.To("dns-udp"),
Port: ptr.To[int32](53),
Protocol: ptr.To(v1.ProtocolSCTP),
Protocol: ptr.To(v1.ProtocolUDP),
},
{
Name: ptr.To("dns-tcp"),
Port: ptr.To[int32](5353),
Protocol: ptr.To(v1.ProtocolTCP),
},
{
Name: ptr.To("dns-sctp"),
Port: ptr.To[int32](53),
Protocol: ptr.To(v1.ProtocolSCTP),
},
}
}),
)
@ -2021,7 +2027,7 @@ func TestClusterIPGeneral(t *testing.T) {
masq: false,
},
{
name: "clusterIP with TCP and SCTP on same port (TCP)",
name: "clusterIP with TCP, UDP, and SCTP on same port (TCP)",
sourceIP: "10.180.0.2",
protocol: v1.ProtocolTCP,
destIP: "172.30.0.42",
@ -2030,7 +2036,16 @@ func TestClusterIPGeneral(t *testing.T) {
masq: false,
},
{
name: "clusterIP with TCP and SCTP on same port (SCTP)",
name: "clusterIP with TCP, UDP, and SCTP on same port (UDP)",
sourceIP: "10.180.0.2",
protocol: v1.ProtocolUDP,
destIP: "172.30.0.42",
destPort: 53,
output: "10.180.0.1:53, 10.180.2.1:53",
masq: false,
},
{
name: "clusterIP with TCP, UDP, and SCTP on same port (SCTP)",
sourceIP: "10.180.0.2",
protocol: v1.ProtocolSCTP,
destIP: "172.30.0.42",

View File

@ -256,7 +256,7 @@ type Proxier struct {
iptables utiliptables.Interface
ipvs utilipvs.Interface
ipset utilipset.Interface
exec utilexec.Interface
conntrack conntrack.Interface
masqueradeAll bool
masqueradeMark string
localDetector proxyutiliptables.LocalTrafficDetector
@ -433,7 +433,7 @@ func NewProxier(ipFamily v1.IPFamily,
iptables: ipt,
masqueradeAll: masqueradeAll,
masqueradeMark: masqueradeMark,
exec: exec,
conntrack: conntrack.NewExec(exec),
localDetector: localDetector,
hostname: hostname,
nodeIP: nodeIP,
@ -1482,7 +1482,7 @@ func (proxier *Proxier) syncProxyRules() {
metrics.SyncProxyRulesNoLocalEndpointsTotal.WithLabelValues("external").Set(float64(proxier.serviceNoLocalEndpointsExternal.Len()))
// Finish housekeeping, clear stale conntrack entries for UDP Services
conntrack.CleanStaleEntries(proxier.ipFamily == v1.IPv6Protocol, proxier.exec, proxier.svcPortMap, serviceUpdateResult, endpointUpdateResult)
conntrack.CleanStaleEntries(proxier.conntrack, proxier.svcPortMap, serviceUpdateResult, endpointUpdateResult)
}
// writeIptablesRules write all iptables rules to proxier.natRules or proxier.FilterRules that ipvs proxier needed

View File

@ -41,6 +41,7 @@ import (
"k8s.io/component-base/metrics/testutil"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/proxy"
"k8s.io/kubernetes/pkg/proxy/conntrack"
"k8s.io/kubernetes/pkg/proxy/healthcheck"
utilipset "k8s.io/kubernetes/pkg/proxy/ipvs/ipset"
ipsettest "k8s.io/kubernetes/pkg/proxy/ipvs/ipset/testing"
@ -54,8 +55,6 @@ import (
"k8s.io/kubernetes/pkg/util/async"
utiliptables "k8s.io/kubernetes/pkg/util/iptables"
iptablestest "k8s.io/kubernetes/pkg/util/iptables/testing"
"k8s.io/utils/exec"
fakeexec "k8s.io/utils/exec/testing"
netutils "k8s.io/utils/net"
"k8s.io/utils/ptr"
)
@ -131,26 +130,12 @@ func NewFakeProxier(ipt utiliptables.Interface, ipvs utilipvs.Interface, ipset u
netlinkHandle := netlinktest.NewFakeNetlinkHandle(ipFamily == v1.IPv6Protocol)
netlinkHandle.SetLocalAddresses("eth0", nodeIPs...)
fcmd := fakeexec.FakeCmd{
CombinedOutputScript: []fakeexec.FakeAction{
func() ([]byte, []byte, error) { return []byte("dummy device have been created"), nil, nil },
func() ([]byte, []byte, error) { return []byte(""), nil, nil },
},
}
fexec := &fakeexec.FakeExec{
CommandScript: []fakeexec.FakeCommandAction{
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
},
LookPathFunc: func(cmd string) (string, error) { return cmd, nil },
}
// initialize ipsetList with all sets we needed
ipsetList := make(map[string]*IPSet)
for _, is := range ipsetInfo {
ipsetList[is.name] = NewIPSet(ipset, is.name, is.setType, false, is.comment)
}
p := &Proxier{
exec: fexec,
svcPortMap: make(proxy.ServicePortMap),
serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, ipFamily, nil, nil),
endpointsMap: make(proxy.EndpointsMap),
@ -159,6 +144,7 @@ func NewFakeProxier(ipt utiliptables.Interface, ipvs utilipvs.Interface, ipset u
iptables: ipt,
ipvs: ipvs,
ipset: ipset,
conntrack: conntrack.NewFake(),
strictARP: false,
localDetector: proxyutiliptables.NewNoOpLocalDetector(),
hostname: testHostname,

View File

@ -162,7 +162,7 @@ type Proxier struct {
nftables knftables.Interface
masqueradeAll bool
masqueradeMark string
exec utilexec.Interface
conntrack conntrack.Interface
localDetector proxyutiliptables.LocalTrafficDetector
hostname string
nodeIP net.IP
@ -236,7 +236,7 @@ func NewProxier(ipFamily v1.IPFamily,
nftables: nft,
masqueradeAll: masqueradeAll,
masqueradeMark: masqueradeMark,
exec: utilexec.New(),
conntrack: conntrack.NewExec(utilexec.New()),
localDetector: localDetector,
hostname: hostname,
nodeIP: nodeIP,
@ -1548,7 +1548,7 @@ func (proxier *Proxier) syncProxyRules() {
}
// Finish housekeeping, clear stale conntrack entries for UDP Services
conntrack.CleanStaleEntries(proxier.ipFamily == v1.IPv6Protocol, proxier.exec, proxier.svcPortMap, serviceUpdateResult, endpointUpdateResult)
conntrack.CleanStaleEntries(proxier.conntrack, proxier.svcPortMap, serviceUpdateResult, endpointUpdateResult)
}
func (proxier *Proxier) writeServiceToEndpointRules(tx *knftables.Transaction, svcPortNameString string, svcInfo *servicePortInfo, svcChain string, endpoints []proxy.Endpoint) {

View File

@ -39,6 +39,7 @@ import (
"k8s.io/component-base/metrics/testutil"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/proxy"
"k8s.io/kubernetes/pkg/proxy/conntrack"
"k8s.io/kubernetes/pkg/proxy/metrics"
"k8s.io/kubernetes/pkg/proxy/healthcheck"
@ -46,7 +47,6 @@ import (
proxyutiliptables "k8s.io/kubernetes/pkg/proxy/util/iptables"
proxyutiltest "k8s.io/kubernetes/pkg/proxy/util/testing"
"k8s.io/kubernetes/pkg/util/async"
fakeexec "k8s.io/utils/exec/testing"
netutils "k8s.io/utils/net"
"k8s.io/utils/ptr"
)
@ -105,13 +105,13 @@ func NewFakeProxier(ipFamily v1.IPFamily) (*knftables.Fake, *Proxier) {
p := &Proxier{
ipFamily: ipFamily,
exec: &fakeexec.FakeExec{},
svcPortMap: make(proxy.ServicePortMap),
serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, ipFamily, nil, nil),
endpointsMap: make(proxy.EndpointsMap),
endpointsChanges: proxy.NewEndpointsChangeTracker(testHostname, newEndpointInfo, ipFamily, nil, nil),
nftables: nft,
masqueradeMark: "0x4000",
conntrack: conntrack.NewFake(),
localDetector: detectLocal,
hostname: testHostname,
serviceHealthServer: healthcheck.NewFakeServiceHealthServer(),
@ -534,22 +534,23 @@ func TestClusterIPGeneral(t *testing.T) {
TargetPort: intstr.FromInt32(8443),
},
{
// Of course this should really be UDP, but if we
// create a service with UDP ports, the Proxier will
// try to do conntrack cleanup and we'd have to set
// the FakeExec up to be able to deal with that...
Name: "dns-sctp",
Name: "dns-udp",
Port: 53,
Protocol: v1.ProtocolSCTP,
Protocol: v1.ProtocolUDP,
},
{
Name: "dns-tcp",
Port: 53,
Protocol: v1.ProtocolTCP,
// We use TargetPort on TCP but not SCTP to help
// disambiguate the output.
// We use TargetPort on TCP but not UDP/SCTP to
// help disambiguate the output.
TargetPort: intstr.FromInt32(5353),
},
{
Name: "dns-sctp",
Port: 53,
Protocol: v1.ProtocolSCTP,
},
}
}),
)
@ -591,15 +592,20 @@ func TestClusterIPGeneral(t *testing.T) {
Protocol: ptr.To(v1.ProtocolTCP),
},
{
Name: ptr.To("dns-sctp"),
Name: ptr.To("dns-udp"),
Port: ptr.To[int32](53),
Protocol: ptr.To(v1.ProtocolSCTP),
Protocol: ptr.To(v1.ProtocolUDP),
},
{
Name: ptr.To("dns-tcp"),
Port: ptr.To[int32](5353),
Protocol: ptr.To(v1.ProtocolTCP),
},
{
Name: ptr.To("dns-sctp"),
Port: ptr.To[int32](53),
Protocol: ptr.To(v1.ProtocolSCTP),
},
}
}),
)
@ -640,7 +646,7 @@ func TestClusterIPGeneral(t *testing.T) {
masq: false,
},
{
name: "clusterIP with TCP and SCTP on same port (TCP)",
name: "clusterIP with TCP, UDP, and SCTP on same port (TCP)",
sourceIP: "10.180.0.2",
protocol: v1.ProtocolTCP,
destIP: "172.30.0.42",
@ -649,7 +655,16 @@ func TestClusterIPGeneral(t *testing.T) {
masq: false,
},
{
name: "clusterIP with TCP and SCTP on same port (SCTP)",
name: "clusterIP with TCP, UDP, and SCTP on same port (TCP)",
sourceIP: "10.180.0.2",
protocol: v1.ProtocolUDP,
destIP: "172.30.0.42",
destPort: 53,
output: "10.180.0.1:53, 10.180.2.1:53",
masq: false,
},
{
name: "clusterIP with TCP, UDP, and SCTP on same port (SCTP)",
sourceIP: "10.180.0.2",
protocol: v1.ProtocolSCTP,
destIP: "172.30.0.42",