mirror of
https://github.com/k3s-io/kubernetes.git
synced 2026-01-04 23:17:50 +00:00
move UDP conntrack operations together to pkg/proxy/util/conntrack.go
This commit is contained in:
@@ -959,8 +959,6 @@ func (esp *endpointServicePair) IPPart() string {
|
||||
return esp.endpoint
|
||||
}
|
||||
|
||||
const noConnectionToDelete = "0 flow entries have been deleted"
|
||||
|
||||
// After a UDP endpoint has been removed, we must flush any pending conntrack entries to it, or else we
|
||||
// risk sending more traffic to it, all of which will be lost (because UDP).
|
||||
// This assumes the proxier mutex is held
|
||||
@@ -968,13 +966,9 @@ func (proxier *Proxier) deleteEndpointConnections(connectionMap map[endpointServ
|
||||
for epSvcPair := range connectionMap {
|
||||
if svcInfo, ok := proxier.serviceMap[epSvcPair.servicePortName]; ok && svcInfo.protocol == api.ProtocolUDP {
|
||||
endpointIP := epSvcPair.endpoint[0:strings.Index(epSvcPair.endpoint, ":")]
|
||||
glog.V(2).Infof("Deleting connection tracking state for service IP %s, endpoint IP %s", svcInfo.clusterIP.String(), endpointIP)
|
||||
err := utilproxy.ExecConntrackTool(proxier.exec, "-D", "--orig-dst", svcInfo.clusterIP.String(), "--dst-nat", endpointIP, "-p", "udp")
|
||||
if err != nil && !strings.Contains(err.Error(), noConnectionToDelete) {
|
||||
// TODO: Better handling for deletion failure. When failure occur, stale udp connection may not get flushed.
|
||||
// These stale udp connection will keep black hole traffic. Making this a best effort operation for now, since it
|
||||
// is expensive to baby sit all udp connections to kubernetes services.
|
||||
glog.Errorf("conntrack return with error: %v", err)
|
||||
err := utilproxy.ClearUDPConntrackForPeers(proxier.exec, svcInfo.clusterIP.String(), endpointIP)
|
||||
if err != nil {
|
||||
glog.Errorf("Failed to delete %s endpoint connections, error: %v", epSvcPair.servicePortName.String(), err)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1373,7 +1367,14 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
continue
|
||||
}
|
||||
if lp.protocol == "udp" {
|
||||
proxier.clearUDPConntrackForPort(lp.port)
|
||||
// TODO: We might have multiple services using the same port, and this will clear conntrack for all of them.
|
||||
// This is very low impact. The NodePort range is intentionally obscure, and unlikely to actually collide with real Services.
|
||||
// This only affects UDP connections, which are not common.
|
||||
// See issue: https://github.com/kubernetes/kubernetes/issues/49881
|
||||
err := utilproxy.ClearUDPConntrackForPort(proxier.exec, lp.port)
|
||||
if err != nil {
|
||||
glog.Errorf("Failed to clear udp conntrack for port %d, error: %v", lp.port, err)
|
||||
}
|
||||
}
|
||||
replacementPortsMap[lp] = socket
|
||||
} // We're holding the port, so it's OK to install iptables rules.
|
||||
@@ -1642,26 +1643,13 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
}
|
||||
|
||||
// Finish housekeeping.
|
||||
// TODO: these and clearUDPConntrackForPort() could be made more consistent.
|
||||
utilproxy.DeleteServiceConnections(proxier.exec, staleServices.List())
|
||||
proxier.deleteEndpointConnections(endpointUpdateResult.staleEndpoints)
|
||||
}
|
||||
|
||||
// Clear UDP conntrack for port or all conntrack entries when port equal zero.
|
||||
// When a packet arrives, it will not go through NAT table again, because it is not "the first" packet.
|
||||
// The solution is clearing the conntrack. Known issus:
|
||||
// https://github.com/docker/docker/issues/8795
|
||||
// https://github.com/kubernetes/kubernetes/issues/31983
|
||||
func (proxier *Proxier) clearUDPConntrackForPort(port int) {
|
||||
glog.V(2).Infof("Deleting conntrack entries for udp connections")
|
||||
if port > 0 {
|
||||
err := utilproxy.ExecConntrackTool(proxier.exec, "-D", "-p", "udp", "--dport", strconv.Itoa(port))
|
||||
if err != nil && !strings.Contains(err.Error(), noConnectionToDelete) {
|
||||
glog.Errorf("conntrack return with error: %v", err)
|
||||
// TODO: these could be made more consistent.
|
||||
for _, svcIP := range staleServices.List() {
|
||||
if err := utilproxy.ClearUDPConntrackForIP(proxier.exec, svcIP); err != nil {
|
||||
glog.Errorf("Failed to delete stale service IP %s connections, error: %v", svcIP, err)
|
||||
}
|
||||
} else {
|
||||
glog.Errorf("Wrong port number. The port number must be greater than zero")
|
||||
}
|
||||
proxier.deleteEndpointConnections(endpointUpdateResult.staleEndpoints)
|
||||
}
|
||||
|
||||
// Join all words with spaces, terminate with newline and write to buf.
|
||||
|
||||
Reference in New Issue
Block a user