Issue #70020; Flush Conntrack entities for SCTP

Signed-off-by: Lars Ekman <lars.g.ekman@est.tech>
This commit is contained in:
Lars Ekman
2020-03-04 14:49:33 +01:00
parent cb38560422
commit aa8521df66
7 changed files with 93 additions and 47 deletions

View File

@@ -722,32 +722,33 @@ func servicePortEndpointChainName(servicePortName string, protocol string, endpo
return utiliptables.Chain("KUBE-SEP-" + encoded[:16])
}
// After a UDP endpoint has been removed, we must flush any pending conntrack entries to it, or else we
// risk sending more traffic to it, all of which will be lost (because UDP).
// After a UDP or SCTP endpoint has been removed, we must flush any pending conntrack entries to it, or else we
// risk sending more traffic to it, all of which will be lost.
// This assumes the proxier mutex is held
// TODO: move it to util
func (proxier *Proxier) deleteEndpointConnections(connectionMap []proxy.ServiceEndpoint) {
for _, epSvcPair := range connectionMap {
if svcInfo, ok := proxier.serviceMap[epSvcPair.ServicePortName]; ok && svcInfo.Protocol() == v1.ProtocolUDP {
if svcInfo, ok := proxier.serviceMap[epSvcPair.ServicePortName]; ok && conntrack.IsClearConntrackNeeded(svcInfo.Protocol()) {
endpointIP := utilproxy.IPPart(epSvcPair.Endpoint)
nodePort := svcInfo.NodePort()
svcProto := svcInfo.Protocol()
var err error
if nodePort != 0 {
err = conntrack.ClearEntriesForPortNAT(proxier.exec, endpointIP, nodePort, v1.ProtocolUDP)
err = conntrack.ClearEntriesForPortNAT(proxier.exec, endpointIP, nodePort, svcProto)
} else {
err = conntrack.ClearEntriesForNAT(proxier.exec, svcInfo.ClusterIP().String(), endpointIP, v1.ProtocolUDP)
err = conntrack.ClearEntriesForNAT(proxier.exec, svcInfo.ClusterIP().String(), endpointIP, svcProto)
}
if err != nil {
klog.Errorf("Failed to delete %s endpoint connections, error: %v", epSvcPair.ServicePortName.String(), err)
}
for _, extIP := range svcInfo.ExternalIPStrings() {
err := conntrack.ClearEntriesForNAT(proxier.exec, extIP, endpointIP, v1.ProtocolUDP)
err := conntrack.ClearEntriesForNAT(proxier.exec, extIP, endpointIP, svcProto)
if err != nil {
klog.Errorf("Failed to delete %s endpoint connections for externalIP %s, error: %v", epSvcPair.ServicePortName.String(), extIP, err)
}
}
for _, lbIP := range svcInfo.LoadBalancerIPStrings() {
err := conntrack.ClearEntriesForNAT(proxier.exec, lbIP, endpointIP, v1.ProtocolUDP)
err := conntrack.ClearEntriesForNAT(proxier.exec, lbIP, endpointIP, svcProto)
if err != nil {
klog.Errorf("Failed to delete %s endpoint connections for LoabBalancerIP %s, error: %v", epSvcPair.ServicePortName.String(), lbIP, err)
}
@@ -808,8 +809,8 @@ func (proxier *Proxier) syncProxyRules() {
staleServices := serviceUpdateResult.UDPStaleClusterIP
// merge stale services gathered from updateEndpointsMap
for _, svcPortName := range endpointUpdateResult.StaleServiceNames {
if svcInfo, ok := proxier.serviceMap[svcPortName]; ok && svcInfo != nil && svcInfo.Protocol() == v1.ProtocolUDP {
klog.V(2).Infof("Stale udp service %v -> %s", svcPortName, svcInfo.ClusterIP().String())
if svcInfo, ok := proxier.serviceMap[svcPortName]; ok && svcInfo != nil && conntrack.IsClearConntrackNeeded(svcInfo.Protocol()) {
klog.V(2).Infof("Stale %s service %v -> %s", strings.ToLower(string(svcInfo.Protocol())), svcPortName, svcInfo.ClusterIP().String())
staleServices.Insert(svcInfo.ClusterIP().String())
for _, extIP := range svcInfo.ExternalIPStrings() {
staleServices.Insert(extIP)

View File

@@ -230,7 +230,7 @@ func TestDeleteEndpointConnections(t *testing.T) {
}
// Create a fake executor for the conntrack utility. This should only be
// invoked for UDP connections, since no conntrack cleanup is needed for TCP
// invoked for UDP and SCTP connections, since no conntrack cleanup is needed for TCP
fcmd := fakeexec.FakeCmd{}
fexec := fakeexec.FakeExec{
LookPathFunc: func(cmd string) (string, error) { return cmd, nil },
@@ -239,7 +239,7 @@ func TestDeleteEndpointConnections(t *testing.T) {
return fakeexec.InitFakeCmd(&fcmd, cmd, args...)
}
for _, tc := range testCases {
if tc.protocol == UDP {
if conntrack.IsClearConntrackNeeded(tc.protocol) {
var cmdOutput string
var simErr error
if tc.simulatedErr == "" {
@@ -292,15 +292,15 @@ func TestDeleteEndpointConnections(t *testing.T) {
fp.deleteEndpointConnections(input)
// For UDP connections, check the executed conntrack command
// For UDP and SCTP connections, check the executed conntrack command
var expExecs int
if tc.protocol == UDP {
if conntrack.IsClearConntrackNeeded(tc.protocol) {
isIPv6 := func(ip string) bool {
netIP := net.ParseIP(ip)
return netIP.To4() == nil
}
endpointIP := utilproxy.IPPart(tc.endpoint)
expectCommand := fmt.Sprintf("conntrack -D --orig-dst %s --dst-nat %s -p udp", tc.svcIP, endpointIP)
expectCommand := fmt.Sprintf("conntrack -D --orig-dst %s --dst-nat %s -p %s", tc.svcIP, endpointIP, strings.ToLower(string((tc.protocol))))
if isIPv6(endpointIP) {
expectCommand += " -f ipv6"
}

View File

@@ -18,7 +18,6 @@ package ipvs
import (
"fmt"
"strings"
"sync"
"time"
@@ -164,10 +163,10 @@ func (m *GracefulTerminationManager) deleteRsFunc(rsToDelete *listItem) (bool, e
}
for _, rs := range rss {
if rsToDelete.RealServer.Equal(rs) {
// For UDP traffic, no graceful termination, we immediately delete the RS
// For UDP and SCTP traffic, no graceful termination, we immediately delete the RS
// (existing connections will be deleted on the next packet because sysctlExpireNoDestConn=1)
// For other protocols, don't delete until all connections have expired)
if strings.ToUpper(rsToDelete.VirtualServer.Protocol) != "UDP" && rs.ActiveConn+rs.InactiveConn != 0 {
if utilipvs.IsRsGracefulTerminationNeeded(rsToDelete.VirtualServer.Protocol) && rs.ActiveConn+rs.InactiveConn != 0 {
klog.V(5).Infof("Not deleting, RS %v: %v ActiveConn, %v InactiveConn", rsToDelete.String(), rs.ActiveConn, rs.InactiveConn)
return false, nil
}

View File

@@ -1023,8 +1023,8 @@ func (proxier *Proxier) syncProxyRules() {
staleServices := serviceUpdateResult.UDPStaleClusterIP
// merge stale services gathered from updateEndpointsMap
for _, svcPortName := range endpointUpdateResult.StaleServiceNames {
if svcInfo, ok := proxier.serviceMap[svcPortName]; ok && svcInfo != nil && svcInfo.Protocol() == v1.ProtocolUDP {
klog.V(2).Infof("Stale udp service %v -> %s", svcPortName, svcInfo.ClusterIP().String())
if svcInfo, ok := proxier.serviceMap[svcPortName]; ok && svcInfo != nil && conntrack.IsClearConntrackNeeded(svcInfo.Protocol()) {
klog.V(2).Infof("Stale %s service %v -> %s", strings.ToLower(string(svcInfo.Protocol())), svcPortName, svcInfo.ClusterIP().String())
staleServices.Insert(svcInfo.ClusterIP().String())
for _, extIP := range svcInfo.ExternalIPStrings() {
staleServices.Insert(extIP)
@@ -1842,25 +1842,26 @@ func (proxier *Proxier) getExistingChains(buffer *bytes.Buffer, table utiliptabl
return nil
}
// After a UDP endpoint has been removed, we must flush any pending conntrack entries to it, or else we
// After a UDP or SCTP endpoint has been removed, we must flush any pending conntrack entries to it, or else we
// risk sending more traffic to it, all of which will be lost (because UDP).
// This assumes the proxier mutex is held
func (proxier *Proxier) deleteEndpointConnections(connectionMap []proxy.ServiceEndpoint) {
for _, epSvcPair := range connectionMap {
if svcInfo, ok := proxier.serviceMap[epSvcPair.ServicePortName]; ok && svcInfo.Protocol() == v1.ProtocolUDP {
if svcInfo, ok := proxier.serviceMap[epSvcPair.ServicePortName]; ok && conntrack.IsClearConntrackNeeded(svcInfo.Protocol()) {
endpointIP := utilproxy.IPPart(epSvcPair.Endpoint)
err := conntrack.ClearEntriesForNAT(proxier.exec, svcInfo.ClusterIP().String(), endpointIP, v1.ProtocolUDP)
svcProto := svcInfo.Protocol()
err := conntrack.ClearEntriesForNAT(proxier.exec, svcInfo.ClusterIP().String(), endpointIP, svcProto)
if err != nil {
klog.Errorf("Failed to delete %s endpoint connections, error: %v", epSvcPair.ServicePortName.String(), err)
}
for _, extIP := range svcInfo.ExternalIPStrings() {
err := conntrack.ClearEntriesForNAT(proxier.exec, extIP, endpointIP, v1.ProtocolUDP)
err := conntrack.ClearEntriesForNAT(proxier.exec, extIP, endpointIP, svcProto)
if err != nil {
klog.Errorf("Failed to delete %s endpoint connections for externalIP %s, error: %v", epSvcPair.ServicePortName.String(), extIP, err)
}
}
for _, lbIP := range svcInfo.LoadBalancerIPStrings() {
err := conntrack.ClearEntriesForNAT(proxier.exec, lbIP, endpointIP, v1.ProtocolUDP)
err := conntrack.ClearEntriesForNAT(proxier.exec, lbIP, endpointIP, svcProto)
if err != nil {
klog.Errorf("Failed to delete %s endpoint connections for LoadBalancerIP %s, error: %v", epSvcPair.ServicePortName.String(), lbIP, err)
}