From aa8521df666c0cc8f65da7150a1961cf3277fc44 Mon Sep 17 00:00:00 2001 From: Lars Ekman Date: Wed, 4 Mar 2020 14:49:33 +0100 Subject: [PATCH] Issue #70020; Flush Conntrack entities for SCTP Signed-off-by: Lars Ekman --- pkg/proxy/iptables/proxier.go | 19 +++---- pkg/proxy/iptables/proxier_test.go | 10 ++-- pkg/proxy/ipvs/graceful_termination.go | 5 +- pkg/proxy/ipvs/proxier.go | 15 +++--- pkg/util/conntrack/conntrack.go | 11 ++-- pkg/util/conntrack/conntrack_test.go | 74 +++++++++++++++++++------- pkg/util/ipvs/ipvs.go | 6 +++ 7 files changed, 93 insertions(+), 47 deletions(-) diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index e223f0f9e8f..b4aeb89a06c 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -722,32 +722,33 @@ func servicePortEndpointChainName(servicePortName string, protocol string, endpo return utiliptables.Chain("KUBE-SEP-" + encoded[:16]) } -// After a UDP endpoint has been removed, we must flush any pending conntrack entries to it, or else we -// risk sending more traffic to it, all of which will be lost (because UDP). +// After a UDP or SCTP endpoint has been removed, we must flush any pending conntrack entries to it, or else we +// risk sending more traffic to it, all of which will be lost. // This assumes the proxier mutex is held // TODO: move it to util func (proxier *Proxier) deleteEndpointConnections(connectionMap []proxy.ServiceEndpoint) { for _, epSvcPair := range connectionMap { - if svcInfo, ok := proxier.serviceMap[epSvcPair.ServicePortName]; ok && svcInfo.Protocol() == v1.ProtocolUDP { + if svcInfo, ok := proxier.serviceMap[epSvcPair.ServicePortName]; ok && conntrack.IsClearConntrackNeeded(svcInfo.Protocol()) { endpointIP := utilproxy.IPPart(epSvcPair.Endpoint) nodePort := svcInfo.NodePort() + svcProto := svcInfo.Protocol() var err error if nodePort != 0 { - err = conntrack.ClearEntriesForPortNAT(proxier.exec, endpointIP, nodePort, v1.ProtocolUDP) + err = conntrack.ClearEntriesForPortNAT(proxier.exec, endpointIP, nodePort, svcProto) } else { - err = conntrack.ClearEntriesForNAT(proxier.exec, svcInfo.ClusterIP().String(), endpointIP, v1.ProtocolUDP) + err = conntrack.ClearEntriesForNAT(proxier.exec, svcInfo.ClusterIP().String(), endpointIP, svcProto) } if err != nil { klog.Errorf("Failed to delete %s endpoint connections, error: %v", epSvcPair.ServicePortName.String(), err) } for _, extIP := range svcInfo.ExternalIPStrings() { - err := conntrack.ClearEntriesForNAT(proxier.exec, extIP, endpointIP, v1.ProtocolUDP) + err := conntrack.ClearEntriesForNAT(proxier.exec, extIP, endpointIP, svcProto) if err != nil { klog.Errorf("Failed to delete %s endpoint connections for externalIP %s, error: %v", epSvcPair.ServicePortName.String(), extIP, err) } } for _, lbIP := range svcInfo.LoadBalancerIPStrings() { - err := conntrack.ClearEntriesForNAT(proxier.exec, lbIP, endpointIP, v1.ProtocolUDP) + err := conntrack.ClearEntriesForNAT(proxier.exec, lbIP, endpointIP, svcProto) if err != nil { klog.Errorf("Failed to delete %s endpoint connections for LoabBalancerIP %s, error: %v", epSvcPair.ServicePortName.String(), lbIP, err) } @@ -808,8 +809,8 @@ func (proxier *Proxier) syncProxyRules() { staleServices := serviceUpdateResult.UDPStaleClusterIP // merge stale services gathered from updateEndpointsMap for _, svcPortName := range endpointUpdateResult.StaleServiceNames { - if svcInfo, ok := proxier.serviceMap[svcPortName]; ok && svcInfo != nil && svcInfo.Protocol() == v1.ProtocolUDP { - klog.V(2).Infof("Stale udp service %v -> %s", svcPortName, svcInfo.ClusterIP().String()) + if svcInfo, ok := proxier.serviceMap[svcPortName]; ok && svcInfo != nil && conntrack.IsClearConntrackNeeded(svcInfo.Protocol()) { + klog.V(2).Infof("Stale %s service %v -> %s", strings.ToLower(string(svcInfo.Protocol())), svcPortName, svcInfo.ClusterIP().String()) staleServices.Insert(svcInfo.ClusterIP().String()) for _, extIP := range svcInfo.ExternalIPStrings() { staleServices.Insert(extIP) diff --git a/pkg/proxy/iptables/proxier_test.go b/pkg/proxy/iptables/proxier_test.go index f1fc7af1418..797e7157f0a 100644 --- a/pkg/proxy/iptables/proxier_test.go +++ b/pkg/proxy/iptables/proxier_test.go @@ -230,7 +230,7 @@ func TestDeleteEndpointConnections(t *testing.T) { } // Create a fake executor for the conntrack utility. This should only be - // invoked for UDP connections, since no conntrack cleanup is needed for TCP + // invoked for UDP and SCTP connections, since no conntrack cleanup is needed for TCP fcmd := fakeexec.FakeCmd{} fexec := fakeexec.FakeExec{ LookPathFunc: func(cmd string) (string, error) { return cmd, nil }, @@ -239,7 +239,7 @@ func TestDeleteEndpointConnections(t *testing.T) { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) } for _, tc := range testCases { - if tc.protocol == UDP { + if conntrack.IsClearConntrackNeeded(tc.protocol) { var cmdOutput string var simErr error if tc.simulatedErr == "" { @@ -292,15 +292,15 @@ func TestDeleteEndpointConnections(t *testing.T) { fp.deleteEndpointConnections(input) - // For UDP connections, check the executed conntrack command + // For UDP and SCTP connections, check the executed conntrack command var expExecs int - if tc.protocol == UDP { + if conntrack.IsClearConntrackNeeded(tc.protocol) { isIPv6 := func(ip string) bool { netIP := net.ParseIP(ip) return netIP.To4() == nil } endpointIP := utilproxy.IPPart(tc.endpoint) - expectCommand := fmt.Sprintf("conntrack -D --orig-dst %s --dst-nat %s -p udp", tc.svcIP, endpointIP) + expectCommand := fmt.Sprintf("conntrack -D --orig-dst %s --dst-nat %s -p %s", tc.svcIP, endpointIP, strings.ToLower(string((tc.protocol)))) if isIPv6(endpointIP) { expectCommand += " -f ipv6" } diff --git a/pkg/proxy/ipvs/graceful_termination.go b/pkg/proxy/ipvs/graceful_termination.go index c7f0897fad1..2a6d334727c 100644 --- a/pkg/proxy/ipvs/graceful_termination.go +++ b/pkg/proxy/ipvs/graceful_termination.go @@ -18,7 +18,6 @@ package ipvs import ( "fmt" - "strings" "sync" "time" @@ -164,10 +163,10 @@ func (m *GracefulTerminationManager) deleteRsFunc(rsToDelete *listItem) (bool, e } for _, rs := range rss { if rsToDelete.RealServer.Equal(rs) { - // For UDP traffic, no graceful termination, we immediately delete the RS + // For UDP and SCTP traffic, no graceful termination, we immediately delete the RS // (existing connections will be deleted on the next packet because sysctlExpireNoDestConn=1) // For other protocols, don't delete until all connections have expired) - if strings.ToUpper(rsToDelete.VirtualServer.Protocol) != "UDP" && rs.ActiveConn+rs.InactiveConn != 0 { + if utilipvs.IsRsGracefulTerminationNeeded(rsToDelete.VirtualServer.Protocol) && rs.ActiveConn+rs.InactiveConn != 0 { klog.V(5).Infof("Not deleting, RS %v: %v ActiveConn, %v InactiveConn", rsToDelete.String(), rs.ActiveConn, rs.InactiveConn) return false, nil } diff --git a/pkg/proxy/ipvs/proxier.go b/pkg/proxy/ipvs/proxier.go index 5b3eda42960..508e235676a 100644 --- a/pkg/proxy/ipvs/proxier.go +++ b/pkg/proxy/ipvs/proxier.go @@ -1023,8 +1023,8 @@ func (proxier *Proxier) syncProxyRules() { staleServices := serviceUpdateResult.UDPStaleClusterIP // merge stale services gathered from updateEndpointsMap for _, svcPortName := range endpointUpdateResult.StaleServiceNames { - if svcInfo, ok := proxier.serviceMap[svcPortName]; ok && svcInfo != nil && svcInfo.Protocol() == v1.ProtocolUDP { - klog.V(2).Infof("Stale udp service %v -> %s", svcPortName, svcInfo.ClusterIP().String()) + if svcInfo, ok := proxier.serviceMap[svcPortName]; ok && svcInfo != nil && conntrack.IsClearConntrackNeeded(svcInfo.Protocol()) { + klog.V(2).Infof("Stale %s service %v -> %s", strings.ToLower(string(svcInfo.Protocol())), svcPortName, svcInfo.ClusterIP().String()) staleServices.Insert(svcInfo.ClusterIP().String()) for _, extIP := range svcInfo.ExternalIPStrings() { staleServices.Insert(extIP) @@ -1842,25 +1842,26 @@ func (proxier *Proxier) getExistingChains(buffer *bytes.Buffer, table utiliptabl return nil } -// After a UDP endpoint has been removed, we must flush any pending conntrack entries to it, or else we +// After a UDP or SCTP endpoint has been removed, we must flush any pending conntrack entries to it, or else we // risk sending more traffic to it, all of which will be lost (because UDP). // This assumes the proxier mutex is held func (proxier *Proxier) deleteEndpointConnections(connectionMap []proxy.ServiceEndpoint) { for _, epSvcPair := range connectionMap { - if svcInfo, ok := proxier.serviceMap[epSvcPair.ServicePortName]; ok && svcInfo.Protocol() == v1.ProtocolUDP { + if svcInfo, ok := proxier.serviceMap[epSvcPair.ServicePortName]; ok && conntrack.IsClearConntrackNeeded(svcInfo.Protocol()) { endpointIP := utilproxy.IPPart(epSvcPair.Endpoint) - err := conntrack.ClearEntriesForNAT(proxier.exec, svcInfo.ClusterIP().String(), endpointIP, v1.ProtocolUDP) + svcProto := svcInfo.Protocol() + err := conntrack.ClearEntriesForNAT(proxier.exec, svcInfo.ClusterIP().String(), endpointIP, svcProto) if err != nil { klog.Errorf("Failed to delete %s endpoint connections, error: %v", epSvcPair.ServicePortName.String(), err) } for _, extIP := range svcInfo.ExternalIPStrings() { - err := conntrack.ClearEntriesForNAT(proxier.exec, extIP, endpointIP, v1.ProtocolUDP) + err := conntrack.ClearEntriesForNAT(proxier.exec, extIP, endpointIP, svcProto) if err != nil { klog.Errorf("Failed to delete %s endpoint connections for externalIP %s, error: %v", epSvcPair.ServicePortName.String(), extIP, err) } } for _, lbIP := range svcInfo.LoadBalancerIPStrings() { - err := conntrack.ClearEntriesForNAT(proxier.exec, lbIP, endpointIP, v1.ProtocolUDP) + err := conntrack.ClearEntriesForNAT(proxier.exec, lbIP, endpointIP, svcProto) if err != nil { klog.Errorf("Failed to delete %s endpoint connections for LoadBalancerIP %s, error: %v", epSvcPair.ServicePortName.String(), lbIP, err) } diff --git a/pkg/util/conntrack/conntrack.go b/pkg/util/conntrack/conntrack.go index 06b76edc6dc..204e05dd1e5 100644 --- a/pkg/util/conntrack/conntrack.go +++ b/pkg/util/conntrack/conntrack.go @@ -21,7 +21,7 @@ import ( "strconv" "strings" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/utils/exec" utilnet "k8s.io/utils/net" ) @@ -103,7 +103,7 @@ func ClearEntriesForNAT(execer exec.Interface, origin, dest string, protocol v1. // TODO: Better handling for deletion failure. When failure occur, stale udp connection may not get flushed. // These stale udp connection will keep black hole traffic. Making this a best effort operation for now, since it // is expensive to baby sit all udp connections to kubernetes services. - return fmt.Errorf("error deleting conntrack entries for UDP peer {%s, %s}, error: %v", origin, dest, err) + return fmt.Errorf("error deleting conntrack entries for %s peer {%s, %s}, error: %v", protoStr(protocol), origin, dest, err) } return nil } @@ -119,7 +119,12 @@ func ClearEntriesForPortNAT(execer exec.Interface, dest string, port int, protoc parameters := parametersWithFamily(utilnet.IsIPv6String(dest), "-D", "-p", protoStr(protocol), "--dport", strconv.Itoa(port), "--dst-nat", dest) err := Exec(execer, parameters...) if err != nil && !strings.Contains(err.Error(), NoConnectionToDelete) { - return fmt.Errorf("error deleting conntrack entries for UDP port: %d, error: %v", port, err) + return fmt.Errorf("error deleting conntrack entries for %s port: %d, error: %v", protoStr(protocol), port, err) } return nil } + +// IsClearConntrackNeeded returns true if protocol requires conntrack cleanup for the stale connections +func IsClearConntrackNeeded(proto v1.Protocol) bool { + return proto == v1.ProtocolUDP || proto == v1.ProtocolSCTP +} diff --git a/pkg/util/conntrack/conntrack_test.go b/pkg/util/conntrack/conntrack_test.go index 6ceeb510f6b..bed250d478b 100644 --- a/pkg/util/conntrack/conntrack_test.go +++ b/pkg/util/conntrack/conntrack_test.go @@ -21,7 +21,7 @@ import ( "strings" "testing" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/utils/exec" fakeexec "k8s.io/utils/exec/testing" utilnet "k8s.io/utils/net" @@ -177,7 +177,7 @@ func TestClearUDPConntrackForPort(t *testing.T) { } } -func TestDeleteUDPConnections(t *testing.T) { +func TestDeleteConnections(t *testing.T) { fcmd := fakeexec.FakeCmd{ CombinedOutputScript: []fakeexec.FakeAction{ func() ([]byte, []byte, error) { return []byte("1 flow entries have been deleted"), nil, nil }, @@ -185,6 +185,11 @@ func TestDeleteUDPConnections(t *testing.T) { return []byte(""), nil, fmt.Errorf("conntrack v1.4.2 (conntrack-tools): 0 flow entries have been deleted") }, func() ([]byte, []byte, error) { return []byte("1 flow entries have been deleted"), nil, nil }, + func() ([]byte, []byte, error) { return []byte("1 flow entries have been deleted"), nil, nil }, + func() ([]byte, []byte, error) { + return []byte(""), nil, fmt.Errorf("conntrack v1.4.2 (conntrack-tools): 0 flow entries have been deleted") + }, + func() ([]byte, []byte, error) { return []byte("1 flow entries have been deleted"), nil, nil }, }, } fexec := fakeexec.FakeExec{ @@ -192,6 +197,9 @@ func TestDeleteUDPConnections(t *testing.T) { func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, + func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, + func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, + func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, }, LookPathFunc: func(cmd string) (string, error) { return cmd, nil }, } @@ -200,30 +208,52 @@ func TestDeleteUDPConnections(t *testing.T) { name string origin string dest string + proto v1.Protocol }{ { - name: "IPv4 success", + name: "UDP IPv4 success", origin: "1.2.3.4", dest: "10.20.30.40", + proto: v1.ProtocolUDP, }, { - name: "IPv4 simulated failure", + name: "UDP IPv4 simulated failure", origin: "2.3.4.5", dest: "20.30.40.50", + proto: v1.ProtocolUDP, }, { - name: "IPv6 success", + name: "UDP IPv6 success", origin: "fd00::600d:f00d", dest: "2001:db8::5", + proto: v1.ProtocolUDP, + }, + { + name: "SCTP IPv4 success", + origin: "1.2.3.5", + dest: "10.20.30.50", + proto: v1.ProtocolSCTP, + }, + { + name: "SCTP IPv4 simulated failure", + origin: "2.3.4.6", + dest: "20.30.40.60", + proto: v1.ProtocolSCTP, + }, + { + name: "SCTP IPv6 success", + origin: "fd00::600d:f00d", + dest: "2001:db8::6", + proto: v1.ProtocolSCTP, }, } svcCount := 0 for i, tc := range testCases { - err := ClearEntriesForNAT(&fexec, tc.origin, tc.dest, v1.ProtocolUDP) + err := ClearEntriesForNAT(&fexec, tc.origin, tc.dest, tc.proto) if err != nil { t.Errorf("%s test case: unexpected error: %v", tc.name, err) } - expectCommand := fmt.Sprintf("conntrack -D --orig-dst %s --dst-nat %s -p udp", tc.origin, tc.dest) + familyParamStr(utilnet.IsIPv6String(tc.origin)) + expectCommand := fmt.Sprintf("conntrack -D --orig-dst %s --dst-nat %s -p %s", tc.origin, tc.dest, protoStr(tc.proto)) + familyParamStr(utilnet.IsIPv6String(tc.origin)) execCommand := strings.Join(fcmd.CombinedOutputLog[i], " ") if expectCommand != execCommand { t.Errorf("%s test case: Expect command: %s, but executed %s", tc.name, expectCommand, execCommand) @@ -235,13 +265,10 @@ func TestDeleteUDPConnections(t *testing.T) { } } -func TestClearUDPConntrackForPortNAT(t *testing.T) { +func TestClearConntrackForPortNAT(t *testing.T) { fcmd := fakeexec.FakeCmd{ CombinedOutputScript: []fakeexec.FakeAction{ func() ([]byte, []byte, error) { return []byte("1 flow entries have been deleted"), nil, nil }, - func() ([]byte, []byte, error) { - return []byte(""), nil, fmt.Errorf("conntrack v1.4.2 (conntrack-tools): 0 flow entries have been deleted") - }, func() ([]byte, []byte, error) { return []byte("1 flow entries have been deleted"), nil, nil }, }, } @@ -249,28 +276,35 @@ func TestClearUDPConntrackForPortNAT(t *testing.T) { CommandScript: []fakeexec.FakeCommandAction{ func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, - func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, }, LookPathFunc: func(cmd string) (string, error) { return cmd, nil }, } testCases := []struct { - name string - port int - dest string + name string + port int + dest string + proto v1.Protocol }{ { - name: "IPv4 success", - port: 30211, - dest: "1.2.3.4", + name: "UDP IPv4 success", + port: 30211, + dest: "1.2.3.4", + proto: v1.ProtocolUDP, + }, + { + name: "SCTP IPv4 success", + port: 30215, + dest: "1.2.3.5", + proto: v1.ProtocolSCTP, }, } svcCount := 0 for i, tc := range testCases { - err := ClearEntriesForPortNAT(&fexec, tc.dest, tc.port, v1.ProtocolUDP) + err := ClearEntriesForPortNAT(&fexec, tc.dest, tc.port, tc.proto) if err != nil { t.Errorf("%s test case: unexpected error: %v", tc.name, err) } - expectCommand := fmt.Sprintf("conntrack -D -p udp --dport %d --dst-nat %s", tc.port, tc.dest) + familyParamStr(utilnet.IsIPv6String(tc.dest)) + expectCommand := fmt.Sprintf("conntrack -D -p %s --dport %d --dst-nat %s", protoStr(tc.proto), tc.port, tc.dest) + familyParamStr(utilnet.IsIPv6String(tc.dest)) execCommand := strings.Join(fcmd.CombinedOutputLog[i], " ") if expectCommand != execCommand { t.Errorf("%s test case: Expect command: %s, but executed %s", tc.name, expectCommand, execCommand) diff --git a/pkg/util/ipvs/ipvs.go b/pkg/util/ipvs/ipvs.go index 13eef323c70..a9bb32f1be6 100644 --- a/pkg/util/ipvs/ipvs.go +++ b/pkg/util/ipvs/ipvs.go @@ -19,6 +19,7 @@ package ipvs import ( "net" "strconv" + "strings" "time" "k8s.io/apimachinery/pkg/util/version" @@ -133,3 +134,8 @@ func GetRequiredIPVSModules(kernelVersion *version.Version) []string { return []string{KernelModuleIPVS, KernelModuleIPVSRR, KernelModuleIPVSWRR, KernelModuleIPVSSH, KernelModuleNfConntrack} } + +// IsRsGracefulTerminationNeeded returns true if protocol requires graceful termination for the stale connections +func IsRsGracefulTerminationNeeded(proto string) bool { + return !strings.EqualFold(proto, "UDP") && !strings.EqualFold(proto, "SCTP") +}