diff --git a/pkg/proxy/conntrack/cleanup.go b/pkg/proxy/conntrack/cleanup.go index cd39c66cb0e..27b38685e5e 100644 --- a/pkg/proxy/conntrack/cleanup.go +++ b/pkg/proxy/conntrack/cleanup.go @@ -25,19 +25,13 @@ import ( "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/proxy" proxyutil "k8s.io/kubernetes/pkg/proxy/util" - utilexec "k8s.io/utils/exec" + netutils "k8s.io/utils/net" ) // CleanStaleEntries takes care of flushing stale conntrack entries for services and endpoints. -func CleanStaleEntries(isIPv6 bool, exec utilexec.Interface, svcPortMap proxy.ServicePortMap, +func CleanStaleEntries(ct Interface, svcPortMap proxy.ServicePortMap, serviceUpdateResult proxy.UpdateServiceMapResult, endpointsUpdateResult proxy.UpdateEndpointsMapResult) { - ct := &execCT{exec} - cleanStaleEntries(ct, isIPv6, svcPortMap, serviceUpdateResult, endpointsUpdateResult) -} - -func cleanStaleEntries(ct Interface, isIPv6 bool, svcPortMap proxy.ServicePortMap, - serviceUpdateResult proxy.UpdateServiceMapResult, endpointsUpdateResult proxy.UpdateEndpointsMapResult) { - deleteStaleServiceConntrackEntries(ct, isIPv6, svcPortMap, serviceUpdateResult, endpointsUpdateResult) + deleteStaleServiceConntrackEntries(ct, svcPortMap, serviceUpdateResult, endpointsUpdateResult) deleteStaleEndpointConntrackEntries(ct, svcPortMap, endpointsUpdateResult) } @@ -45,9 +39,10 @@ func cleanStaleEntries(ct Interface, isIPv6 bool, svcPortMap proxy.ServicePortMa // to UDP Service IPs. When a service has no endpoints and we drop traffic to it, conntrack // may create "black hole" entries for that IP+port. When the service gets endpoints we // need to delete those entries so further traffic doesn't get dropped. -func deleteStaleServiceConntrackEntries(ct Interface, isIPv6 bool, svcPortMap proxy.ServicePortMap, serviceUpdateResult proxy.UpdateServiceMapResult, endpointsUpdateResult proxy.UpdateEndpointsMapResult) { +func deleteStaleServiceConntrackEntries(ct Interface, svcPortMap proxy.ServicePortMap, serviceUpdateResult proxy.UpdateServiceMapResult, endpointsUpdateResult proxy.UpdateEndpointsMapResult) { conntrackCleanupServiceIPs := serviceUpdateResult.DeletedUDPClusterIPs conntrackCleanupServiceNodePorts := sets.New[int]() + isIPv6 := false // merge newly active services gathered from endpointsUpdateResult // a UDP service that changes from 0 to non-0 endpoints is newly active. @@ -64,6 +59,7 @@ func deleteStaleServiceConntrackEntries(ct Interface, isIPv6 bool, svcPortMap pr nodePort := svcInfo.NodePort() if svcInfo.Protocol() == v1.ProtocolUDP && nodePort != 0 { conntrackCleanupServiceNodePorts.Insert(nodePort) + isIPv6 = netutils.IsIPv6(svcInfo.ClusterIP()) } } } diff --git a/pkg/proxy/conntrack/cleanup_test.go b/pkg/proxy/conntrack/cleanup_test.go index 1a440f18963..781800280fb 100644 --- a/pkg/proxy/conntrack/cleanup_test.go +++ b/pkg/proxy/conntrack/cleanup_test.go @@ -245,7 +245,7 @@ func TestCleanStaleEntries(t *testing.T) { for _, tc := range testCases { t.Run(tc.description, func(t *testing.T) { fake := NewFake() - cleanStaleEntries(fake, false, svcPortMap, tc.serviceUpdates, tc.endpointsUpdates) + CleanStaleEntries(fake, svcPortMap, tc.serviceUpdates, tc.endpointsUpdates) if !fake.ClearedIPs.Equal(tc.result.ClearedIPs) { t.Errorf("Expected ClearedIPs=%v, got %v", tc.result.ClearedIPs, fake.ClearedIPs) } diff --git a/pkg/proxy/conntrack/conntrack.go b/pkg/proxy/conntrack/conntrack.go index dc64ec092cb..49aae1b94ea 100644 --- a/pkg/proxy/conntrack/conntrack.go +++ b/pkg/proxy/conntrack/conntrack.go @@ -56,6 +56,10 @@ type execCT struct { var _ Interface = &execCT{} +func NewExec(execer exec.Interface) Interface { + return &execCT{execer: execer} +} + // noConnectionToDelete is the error string returned by conntrack when no matching connections are found const noConnectionToDelete = "0 flow entries have been deleted" diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index 1719fefa229..766997bdfdc 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -170,7 +170,7 @@ type Proxier struct { iptables utiliptables.Interface masqueradeAll bool masqueradeMark string - exec utilexec.Interface + conntrack conntrack.Interface localDetector proxyutiliptables.LocalTrafficDetector hostname string nodeIP net.IP @@ -283,7 +283,7 @@ func NewProxier(ipFamily v1.IPFamily, iptables: ipt, masqueradeAll: masqueradeAll, masqueradeMark: masqueradeMark, - exec: exec, + conntrack: conntrack.NewExec(exec), localDetector: localDetector, hostname: hostname, nodeIP: nodeIP, @@ -1538,7 +1538,7 @@ func (proxier *Proxier) syncProxyRules() { } // Finish housekeeping, clear stale conntrack entries for UDP Services - conntrack.CleanStaleEntries(proxier.iptables.IsIPv6(), proxier.exec, proxier.svcPortMap, serviceUpdateResult, endpointUpdateResult) + conntrack.CleanStaleEntries(proxier.conntrack, proxier.svcPortMap, serviceUpdateResult, endpointUpdateResult) } func (proxier *Proxier) writeServiceToEndpointRules(natRules proxyutil.LineBuffer, svcPortNameString string, svcInfo proxy.ServicePort, svcChain utiliptables.Chain, endpoints []proxy.Endpoint, args []string) { diff --git a/pkg/proxy/iptables/proxier_test.go b/pkg/proxy/iptables/proxier_test.go index 067e5cd93cf..cb434c1263c 100644 --- a/pkg/proxy/iptables/proxier_test.go +++ b/pkg/proxy/iptables/proxier_test.go @@ -48,6 +48,7 @@ import ( "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/proxy" + "k8s.io/kubernetes/pkg/proxy/conntrack" "k8s.io/kubernetes/pkg/proxy/metrics" "k8s.io/kubernetes/pkg/proxy/healthcheck" @@ -57,7 +58,6 @@ import ( "k8s.io/kubernetes/pkg/util/async" utiliptables "k8s.io/kubernetes/pkg/util/iptables" iptablestest "k8s.io/kubernetes/pkg/util/iptables/testing" - fakeexec "k8s.io/utils/exec/testing" netutils "k8s.io/utils/net" "k8s.io/utils/ptr" ) @@ -113,7 +113,6 @@ func NewFakeProxier(ipt utiliptables.Interface) *Proxier { networkInterfacer.AddInterfaceAddr(&itf1, addrs1) p := &Proxier{ - exec: &fakeexec.FakeExec{}, svcPortMap: make(proxy.ServicePortMap), serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, ipfamily, nil, nil), endpointsMap: make(proxy.EndpointsMap), @@ -121,6 +120,7 @@ func NewFakeProxier(ipt utiliptables.Interface) *Proxier { needFullSync: true, iptables: ipt, masqueradeMark: "0x4000", + conntrack: conntrack.NewFake(), localDetector: detectLocal, hostname: testHostname, serviceHealthServer: healthcheck.NewFakeServiceHealthServer(), @@ -1915,22 +1915,23 @@ func TestClusterIPGeneral(t *testing.T) { TargetPort: intstr.FromInt32(8443), }, { - // Of course this should really be UDP, but if we - // create a service with UDP ports, the Proxier will - // try to do conntrack cleanup and we'd have to set - // the FakeExec up to be able to deal with that... - Name: "dns-sctp", + Name: "dns-udp", Port: 53, - Protocol: v1.ProtocolSCTP, + Protocol: v1.ProtocolUDP, }, { Name: "dns-tcp", Port: 53, Protocol: v1.ProtocolTCP, - // We use TargetPort on TCP but not SCTP to help - // disambiguate the output. + // We use TargetPort on TCP but not UDP/SCTP to + // help disambiguate the output. TargetPort: intstr.FromInt32(5353), }, + { + Name: "dns-sctp", + Port: 53, + Protocol: v1.ProtocolSCTP, + }, } }), ) @@ -1972,15 +1973,20 @@ func TestClusterIPGeneral(t *testing.T) { Protocol: ptr.To(v1.ProtocolTCP), }, { - Name: ptr.To("dns-sctp"), + Name: ptr.To("dns-udp"), Port: ptr.To[int32](53), - Protocol: ptr.To(v1.ProtocolSCTP), + Protocol: ptr.To(v1.ProtocolUDP), }, { Name: ptr.To("dns-tcp"), Port: ptr.To[int32](5353), Protocol: ptr.To(v1.ProtocolTCP), }, + { + Name: ptr.To("dns-sctp"), + Port: ptr.To[int32](53), + Protocol: ptr.To(v1.ProtocolSCTP), + }, } }), ) @@ -2021,7 +2027,7 @@ func TestClusterIPGeneral(t *testing.T) { masq: false, }, { - name: "clusterIP with TCP and SCTP on same port (TCP)", + name: "clusterIP with TCP, UDP, and SCTP on same port (TCP)", sourceIP: "10.180.0.2", protocol: v1.ProtocolTCP, destIP: "172.30.0.42", @@ -2030,7 +2036,16 @@ func TestClusterIPGeneral(t *testing.T) { masq: false, }, { - name: "clusterIP with TCP and SCTP on same port (SCTP)", + name: "clusterIP with TCP, UDP, and SCTP on same port (UDP)", + sourceIP: "10.180.0.2", + protocol: v1.ProtocolUDP, + destIP: "172.30.0.42", + destPort: 53, + output: "10.180.0.1:53, 10.180.2.1:53", + masq: false, + }, + { + name: "clusterIP with TCP, UDP, and SCTP on same port (SCTP)", sourceIP: "10.180.0.2", protocol: v1.ProtocolSCTP, destIP: "172.30.0.42", diff --git a/pkg/proxy/ipvs/proxier.go b/pkg/proxy/ipvs/proxier.go index 6643c432d3c..a5d99c48d11 100644 --- a/pkg/proxy/ipvs/proxier.go +++ b/pkg/proxy/ipvs/proxier.go @@ -256,7 +256,7 @@ type Proxier struct { iptables utiliptables.Interface ipvs utilipvs.Interface ipset utilipset.Interface - exec utilexec.Interface + conntrack conntrack.Interface masqueradeAll bool masqueradeMark string localDetector proxyutiliptables.LocalTrafficDetector @@ -433,7 +433,7 @@ func NewProxier(ipFamily v1.IPFamily, iptables: ipt, masqueradeAll: masqueradeAll, masqueradeMark: masqueradeMark, - exec: exec, + conntrack: conntrack.NewExec(exec), localDetector: localDetector, hostname: hostname, nodeIP: nodeIP, @@ -1482,7 +1482,7 @@ func (proxier *Proxier) syncProxyRules() { metrics.SyncProxyRulesNoLocalEndpointsTotal.WithLabelValues("external").Set(float64(proxier.serviceNoLocalEndpointsExternal.Len())) // Finish housekeeping, clear stale conntrack entries for UDP Services - conntrack.CleanStaleEntries(proxier.ipFamily == v1.IPv6Protocol, proxier.exec, proxier.svcPortMap, serviceUpdateResult, endpointUpdateResult) + conntrack.CleanStaleEntries(proxier.conntrack, proxier.svcPortMap, serviceUpdateResult, endpointUpdateResult) } // writeIptablesRules write all iptables rules to proxier.natRules or proxier.FilterRules that ipvs proxier needed diff --git a/pkg/proxy/ipvs/proxier_test.go b/pkg/proxy/ipvs/proxier_test.go index 360fc051782..33af6d72b61 100644 --- a/pkg/proxy/ipvs/proxier_test.go +++ b/pkg/proxy/ipvs/proxier_test.go @@ -41,6 +41,7 @@ import ( "k8s.io/component-base/metrics/testutil" "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/proxy" + "k8s.io/kubernetes/pkg/proxy/conntrack" "k8s.io/kubernetes/pkg/proxy/healthcheck" utilipset "k8s.io/kubernetes/pkg/proxy/ipvs/ipset" ipsettest "k8s.io/kubernetes/pkg/proxy/ipvs/ipset/testing" @@ -54,8 +55,6 @@ import ( "k8s.io/kubernetes/pkg/util/async" utiliptables "k8s.io/kubernetes/pkg/util/iptables" iptablestest "k8s.io/kubernetes/pkg/util/iptables/testing" - "k8s.io/utils/exec" - fakeexec "k8s.io/utils/exec/testing" netutils "k8s.io/utils/net" "k8s.io/utils/ptr" ) @@ -131,26 +130,12 @@ func NewFakeProxier(ipt utiliptables.Interface, ipvs utilipvs.Interface, ipset u netlinkHandle := netlinktest.NewFakeNetlinkHandle(ipFamily == v1.IPv6Protocol) netlinkHandle.SetLocalAddresses("eth0", nodeIPs...) - fcmd := fakeexec.FakeCmd{ - CombinedOutputScript: []fakeexec.FakeAction{ - func() ([]byte, []byte, error) { return []byte("dummy device have been created"), nil, nil }, - func() ([]byte, []byte, error) { return []byte(""), nil, nil }, - }, - } - fexec := &fakeexec.FakeExec{ - CommandScript: []fakeexec.FakeCommandAction{ - func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, - func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, - }, - LookPathFunc: func(cmd string) (string, error) { return cmd, nil }, - } // initialize ipsetList with all sets we needed ipsetList := make(map[string]*IPSet) for _, is := range ipsetInfo { ipsetList[is.name] = NewIPSet(ipset, is.name, is.setType, false, is.comment) } p := &Proxier{ - exec: fexec, svcPortMap: make(proxy.ServicePortMap), serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, ipFamily, nil, nil), endpointsMap: make(proxy.EndpointsMap), @@ -159,6 +144,7 @@ func NewFakeProxier(ipt utiliptables.Interface, ipvs utilipvs.Interface, ipset u iptables: ipt, ipvs: ipvs, ipset: ipset, + conntrack: conntrack.NewFake(), strictARP: false, localDetector: proxyutiliptables.NewNoOpLocalDetector(), hostname: testHostname, diff --git a/pkg/proxy/nftables/proxier.go b/pkg/proxy/nftables/proxier.go index 5741e0f05aa..896c1232bd6 100644 --- a/pkg/proxy/nftables/proxier.go +++ b/pkg/proxy/nftables/proxier.go @@ -162,7 +162,7 @@ type Proxier struct { nftables knftables.Interface masqueradeAll bool masqueradeMark string - exec utilexec.Interface + conntrack conntrack.Interface localDetector proxyutiliptables.LocalTrafficDetector hostname string nodeIP net.IP @@ -236,7 +236,7 @@ func NewProxier(ipFamily v1.IPFamily, nftables: nft, masqueradeAll: masqueradeAll, masqueradeMark: masqueradeMark, - exec: utilexec.New(), + conntrack: conntrack.NewExec(utilexec.New()), localDetector: localDetector, hostname: hostname, nodeIP: nodeIP, @@ -1548,7 +1548,7 @@ func (proxier *Proxier) syncProxyRules() { } // Finish housekeeping, clear stale conntrack entries for UDP Services - conntrack.CleanStaleEntries(proxier.ipFamily == v1.IPv6Protocol, proxier.exec, proxier.svcPortMap, serviceUpdateResult, endpointUpdateResult) + conntrack.CleanStaleEntries(proxier.conntrack, proxier.svcPortMap, serviceUpdateResult, endpointUpdateResult) } func (proxier *Proxier) writeServiceToEndpointRules(tx *knftables.Transaction, svcPortNameString string, svcInfo *servicePortInfo, svcChain string, endpoints []proxy.Endpoint) { diff --git a/pkg/proxy/nftables/proxier_test.go b/pkg/proxy/nftables/proxier_test.go index f1486401c75..66132ba3b40 100644 --- a/pkg/proxy/nftables/proxier_test.go +++ b/pkg/proxy/nftables/proxier_test.go @@ -39,6 +39,7 @@ import ( "k8s.io/component-base/metrics/testutil" "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/proxy" + "k8s.io/kubernetes/pkg/proxy/conntrack" "k8s.io/kubernetes/pkg/proxy/metrics" "k8s.io/kubernetes/pkg/proxy/healthcheck" @@ -46,7 +47,6 @@ import ( proxyutiliptables "k8s.io/kubernetes/pkg/proxy/util/iptables" proxyutiltest "k8s.io/kubernetes/pkg/proxy/util/testing" "k8s.io/kubernetes/pkg/util/async" - fakeexec "k8s.io/utils/exec/testing" netutils "k8s.io/utils/net" "k8s.io/utils/ptr" ) @@ -105,13 +105,13 @@ func NewFakeProxier(ipFamily v1.IPFamily) (*knftables.Fake, *Proxier) { p := &Proxier{ ipFamily: ipFamily, - exec: &fakeexec.FakeExec{}, svcPortMap: make(proxy.ServicePortMap), serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, ipFamily, nil, nil), endpointsMap: make(proxy.EndpointsMap), endpointsChanges: proxy.NewEndpointsChangeTracker(testHostname, newEndpointInfo, ipFamily, nil, nil), nftables: nft, masqueradeMark: "0x4000", + conntrack: conntrack.NewFake(), localDetector: detectLocal, hostname: testHostname, serviceHealthServer: healthcheck.NewFakeServiceHealthServer(), @@ -534,22 +534,23 @@ func TestClusterIPGeneral(t *testing.T) { TargetPort: intstr.FromInt32(8443), }, { - // Of course this should really be UDP, but if we - // create a service with UDP ports, the Proxier will - // try to do conntrack cleanup and we'd have to set - // the FakeExec up to be able to deal with that... - Name: "dns-sctp", + Name: "dns-udp", Port: 53, - Protocol: v1.ProtocolSCTP, + Protocol: v1.ProtocolUDP, }, { Name: "dns-tcp", Port: 53, Protocol: v1.ProtocolTCP, - // We use TargetPort on TCP but not SCTP to help - // disambiguate the output. + // We use TargetPort on TCP but not UDP/SCTP to + // help disambiguate the output. TargetPort: intstr.FromInt32(5353), }, + { + Name: "dns-sctp", + Port: 53, + Protocol: v1.ProtocolSCTP, + }, } }), ) @@ -591,15 +592,20 @@ func TestClusterIPGeneral(t *testing.T) { Protocol: ptr.To(v1.ProtocolTCP), }, { - Name: ptr.To("dns-sctp"), + Name: ptr.To("dns-udp"), Port: ptr.To[int32](53), - Protocol: ptr.To(v1.ProtocolSCTP), + Protocol: ptr.To(v1.ProtocolUDP), }, { Name: ptr.To("dns-tcp"), Port: ptr.To[int32](5353), Protocol: ptr.To(v1.ProtocolTCP), }, + { + Name: ptr.To("dns-sctp"), + Port: ptr.To[int32](53), + Protocol: ptr.To(v1.ProtocolSCTP), + }, } }), ) @@ -640,7 +646,7 @@ func TestClusterIPGeneral(t *testing.T) { masq: false, }, { - name: "clusterIP with TCP and SCTP on same port (TCP)", + name: "clusterIP with TCP, UDP, and SCTP on same port (TCP)", sourceIP: "10.180.0.2", protocol: v1.ProtocolTCP, destIP: "172.30.0.42", @@ -649,7 +655,16 @@ func TestClusterIPGeneral(t *testing.T) { masq: false, }, { - name: "clusterIP with TCP and SCTP on same port (SCTP)", + name: "clusterIP with TCP, UDP, and SCTP on same port (TCP)", + sourceIP: "10.180.0.2", + protocol: v1.ProtocolUDP, + destIP: "172.30.0.42", + destPort: 53, + output: "10.180.0.1:53, 10.180.2.1:53", + masq: false, + }, + { + name: "clusterIP with TCP, UDP, and SCTP on same port (SCTP)", sourceIP: "10.180.0.2", protocol: v1.ProtocolSCTP, destIP: "172.30.0.42",