Merge pull request #116104 from daman1807/conntrack

Unified Conntrack Cleaning
This commit is contained in:
Kubernetes Prow Robot 2023-04-19 00:42:47 -07:00 committed by GitHub
commit ff4eff24ac
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 120 additions and 167 deletions

View File

@ -0,0 +1,111 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package conntrack
import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/proxy"
utilproxy "k8s.io/kubernetes/pkg/proxy/util"
utilexec "k8s.io/utils/exec"
)
// CleanStaleEntries takes care of flushing stale conntrack entries for services and endpoints.
func CleanStaleEntries(isIPv6 bool, exec utilexec.Interface, svcPortMap proxy.ServicePortMap,
serviceUpdateResult proxy.UpdateServiceMapResult, endpointUpdateResult proxy.UpdateEndpointMapResult) {
deleteStaleServiceConntrackEntries(isIPv6, exec, svcPortMap, serviceUpdateResult, endpointUpdateResult)
deleteStaleEndpointConntrackEntries(exec, svcPortMap, endpointUpdateResult)
}
// deleteStaleServiceConntrackEntries takes care of flushing stale conntrack entries related
// to UDP Service IPs. When a service has no endpoints and we drop traffic to it, conntrack
// may create "black hole" entries for that IP+port. When the service gets endpoints we
// need to delete those entries so further traffic doesn't get dropped.
func deleteStaleServiceConntrackEntries(isIPv6 bool, exec utilexec.Interface, svcPortMap proxy.ServicePortMap, serviceUpdateResult proxy.UpdateServiceMapResult, endpointUpdateResult proxy.UpdateEndpointMapResult) {
conntrackCleanupServiceIPs := serviceUpdateResult.DeletedUDPClusterIPs
conntrackCleanupServiceNodePorts := sets.New[int]()
// merge newly active services gathered from updateEndpointsMap
// a UDP service that changes from 0 to non-0 endpoints is newly active.
for _, svcPortName := range endpointUpdateResult.NewlyActiveUDPServices {
if svcInfo, ok := svcPortMap[svcPortName]; ok {
klog.V(4).InfoS("Newly-active UDP service may have stale conntrack entries", "servicePortName", svcPortName)
conntrackCleanupServiceIPs.Insert(svcInfo.ClusterIP().String())
for _, extIP := range svcInfo.ExternalIPStrings() {
conntrackCleanupServiceIPs.Insert(extIP)
}
for _, lbIP := range svcInfo.LoadBalancerIPStrings() {
conntrackCleanupServiceIPs.Insert(lbIP)
}
nodePort := svcInfo.NodePort()
if svcInfo.Protocol() == v1.ProtocolUDP && nodePort != 0 {
conntrackCleanupServiceNodePorts.Insert(nodePort)
}
}
}
klog.V(4).InfoS("Deleting conntrack stale entries for services", "IPs", conntrackCleanupServiceIPs.UnsortedList())
for _, svcIP := range conntrackCleanupServiceIPs.UnsortedList() {
if err := ClearEntriesForIP(exec, svcIP, v1.ProtocolUDP); err != nil {
klog.ErrorS(err, "Failed to delete stale service connections", "IP", svcIP)
}
}
klog.V(4).InfoS("Deleting conntrack stale entries for services", "nodePorts", conntrackCleanupServiceNodePorts.UnsortedList())
for _, nodePort := range conntrackCleanupServiceNodePorts.UnsortedList() {
err := ClearEntriesForPort(exec, nodePort, isIPv6, v1.ProtocolUDP)
if err != nil {
klog.ErrorS(err, "Failed to clear udp conntrack", "nodePort", nodePort)
}
}
}
// deleteStaleEndpointConntrackEntries takes care of flushing stale conntrack entries related
// to UDP endpoints. After a UDP endpoint is removed we must flush any conntrack entries
// for it so that if the same client keeps sending, the packets will get routed to a new endpoint.
func deleteStaleEndpointConntrackEntries(exec utilexec.Interface, svcPortMap proxy.ServicePortMap, endpointUpdateResult proxy.UpdateEndpointMapResult) {
for _, epSvcPair := range endpointUpdateResult.DeletedUDPEndpoints {
if svcInfo, ok := svcPortMap[epSvcPair.ServicePortName]; ok {
endpointIP := utilproxy.IPPart(epSvcPair.Endpoint)
nodePort := svcInfo.NodePort()
var err error
if nodePort != 0 {
err = ClearEntriesForPortNAT(exec, endpointIP, nodePort, v1.ProtocolUDP)
if err != nil {
klog.ErrorS(err, "Failed to delete nodeport-related endpoint connections", "servicePortName", epSvcPair.ServicePortName)
}
}
err = ClearEntriesForNAT(exec, svcInfo.ClusterIP().String(), endpointIP, v1.ProtocolUDP)
if err != nil {
klog.ErrorS(err, "Failed to delete endpoint connections", "servicePortName", epSvcPair.ServicePortName)
}
for _, extIP := range svcInfo.ExternalIPStrings() {
err := ClearEntriesForNAT(exec, extIP, endpointIP, v1.ProtocolUDP)
if err != nil {
klog.ErrorS(err, "Failed to delete endpoint connections for externalIP", "servicePortName", epSvcPair.ServicePortName, "externalIP", extIP)
}
}
for _, lbIP := range svcInfo.LoadBalancerIPStrings() {
err := ClearEntriesForNAT(exec, lbIP, endpointIP, v1.ProtocolUDP)
if err != nil {
klog.ErrorS(err, "Failed to delete endpoint connections for LoadBalancerIP", "servicePortName", epSvcPair.ServicePortName, "loadBalancerIP", lbIP)
}
}
}
}
}

View File

@ -63,12 +63,12 @@ func Exec(execer exec.Interface, parameters ...string) error {
if err != nil {
return fmt.Errorf("error looking for path of conntrack: %v", err)
}
klog.V(4).Infof("Clearing conntrack entries %v", parameters)
klog.V(4).InfoS("Clearing conntrack entries", "parameters", parameters)
output, err := execer.Command(conntrackPath, parameters...).CombinedOutput()
if err != nil {
return fmt.Errorf("conntrack command returned: %q, error message: %s", string(output), err)
}
klog.V(4).Infof("Conntrack entries deleted %s", string(output))
klog.V(4).InfoS("Conntrack entries deleted", "output", string(output))
return nil
}

View File

@ -44,13 +44,13 @@ import (
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/proxy"
"k8s.io/kubernetes/pkg/proxy/conntrack"
"k8s.io/kubernetes/pkg/proxy/healthcheck"
"k8s.io/kubernetes/pkg/proxy/metaproxier"
"k8s.io/kubernetes/pkg/proxy/metrics"
utilproxy "k8s.io/kubernetes/pkg/proxy/util"
proxyutiliptables "k8s.io/kubernetes/pkg/proxy/util/iptables"
"k8s.io/kubernetes/pkg/util/async"
"k8s.io/kubernetes/pkg/util/conntrack"
utiliptables "k8s.io/kubernetes/pkg/util/iptables"
utilexec "k8s.io/utils/exec"
netutils "k8s.io/utils/net"
@ -744,42 +744,6 @@ func isServiceChainName(chainString string) bool {
return false
}
// After a UDP endpoint has been removed, we must flush any pending conntrack entries to it, or else we
// risk sending more traffic to it, all of which will be lost (because UDP).
// This assumes the proxier mutex is held
// TODO: move it to util
func (proxier *Proxier) deleteUDPEndpointConnections(deletedUDPEndpoints []proxy.ServiceEndpoint) {
for _, epSvcPair := range deletedUDPEndpoints {
if svcInfo, ok := proxier.svcPortMap[epSvcPair.ServicePortName]; ok {
endpointIP := utilproxy.IPPart(epSvcPair.Endpoint)
nodePort := svcInfo.NodePort()
var err error
if nodePort != 0 {
err = conntrack.ClearEntriesForPortNAT(proxier.exec, endpointIP, nodePort, v1.ProtocolUDP)
if err != nil {
klog.ErrorS(err, "Failed to delete nodeport-related endpoint connections", "servicePortName", epSvcPair.ServicePortName)
}
}
err = conntrack.ClearEntriesForNAT(proxier.exec, svcInfo.ClusterIP().String(), endpointIP, v1.ProtocolUDP)
if err != nil {
klog.ErrorS(err, "Failed to delete endpoint connections", "servicePortName", epSvcPair.ServicePortName)
}
for _, extIP := range svcInfo.ExternalIPStrings() {
err := conntrack.ClearEntriesForNAT(proxier.exec, extIP, endpointIP, v1.ProtocolUDP)
if err != nil {
klog.ErrorS(err, "Failed to delete endpoint connections for externalIP", "servicePortName", epSvcPair.ServicePortName, "externalIP", extIP)
}
}
for _, lbIP := range svcInfo.LoadBalancerIPStrings() {
err := conntrack.ClearEntriesForNAT(proxier.exec, lbIP, endpointIP, v1.ProtocolUDP)
if err != nil {
klog.ErrorS(err, "Failed to delete endpoint connections for LoadBalancerIP", "servicePortName", epSvcPair.ServicePortName, "loadBalancerIP", lbIP)
}
}
}
}
}
// Assumes proxier.mu is held.
func (proxier *Proxier) appendServiceCommentLocked(args []string, svcName string) []string {
// Not printing these comments, can reduce size of iptables (in case of large
@ -831,29 +795,6 @@ func (proxier *Proxier) syncProxyRules() {
serviceUpdateResult := proxier.svcPortMap.Update(proxier.serviceChanges)
endpointUpdateResult := proxier.endpointsMap.Update(proxier.endpointsChanges)
// We need to detect stale connections to UDP Services so we
// can clean dangling conntrack entries that can blackhole traffic.
conntrackCleanupServiceIPs := serviceUpdateResult.DeletedUDPClusterIPs
conntrackCleanupServiceNodePorts := sets.NewInt()
// merge stale services gathered from updateEndpointsMap
// an UDP service that changes from 0 to non-0 endpoints is considered stale.
for _, svcPortName := range endpointUpdateResult.NewlyActiveUDPServices {
if svcInfo, ok := proxier.svcPortMap[svcPortName]; ok {
klog.V(4).InfoS("Newly-active UDP service may have stale conntrack entries", "servicePortName", svcPortName)
conntrackCleanupServiceIPs.Insert(svcInfo.ClusterIP().String())
for _, extIP := range svcInfo.ExternalIPStrings() {
conntrackCleanupServiceIPs.Insert(extIP)
}
for _, lbIP := range svcInfo.LoadBalancerIPStrings() {
conntrackCleanupServiceIPs.Insert(lbIP)
}
nodePort := svcInfo.NodePort()
if svcInfo.Protocol() == v1.ProtocolUDP && nodePort != 0 {
conntrackCleanupServiceNodePorts.Insert(nodePort)
}
}
}
klog.V(2).InfoS("Syncing iptables rules")
success := false
@ -1629,24 +1570,8 @@ func (proxier *Proxier) syncProxyRules() {
klog.ErrorS(err, "Error syncing healthcheck endpoints")
}
// Finish housekeeping.
// Clear stale conntrack entries for UDP Services, this has to be done AFTER the iptables rules are programmed.
// TODO: these could be made more consistent.
klog.V(4).InfoS("Deleting conntrack stale entries for services", "IPs", conntrackCleanupServiceIPs.UnsortedList())
for _, svcIP := range conntrackCleanupServiceIPs.UnsortedList() {
if err := conntrack.ClearEntriesForIP(proxier.exec, svcIP, v1.ProtocolUDP); err != nil {
klog.ErrorS(err, "Failed to delete stale service connections", "IP", svcIP)
}
}
klog.V(4).InfoS("Deleting conntrack stale entries for services", "nodePorts", conntrackCleanupServiceNodePorts.UnsortedList())
for _, nodePort := range conntrackCleanupServiceNodePorts.UnsortedList() {
err := conntrack.ClearEntriesForPort(proxier.exec, nodePort, isIPv6, v1.ProtocolUDP)
if err != nil {
klog.ErrorS(err, "Failed to clear udp conntrack", "nodePort", nodePort)
}
}
klog.V(4).InfoS("Deleting stale endpoint connections", "endpoints", endpointUpdateResult.DeletedUDPEndpoints)
proxier.deleteUDPEndpointConnections(endpointUpdateResult.DeletedUDPEndpoints)
// Finish housekeeping, clear stale conntrack entries for UDP Services
conntrack.CleanStaleEntries(proxier.iptables.IsIPv6(), proxier.exec, proxier.svcPortMap, serviceUpdateResult, endpointUpdateResult)
}
func (proxier *Proxier) writeServiceToEndpointRules(svcPortNameString string, svcInfo proxy.ServicePort, svcChain utiliptables.Chain, endpoints []proxy.Endpoint, args []string) {

View File

@ -44,6 +44,7 @@ import (
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/proxy"
"k8s.io/kubernetes/pkg/proxy/conntrack"
"k8s.io/kubernetes/pkg/proxy/metrics"
"k8s.io/kubernetes/pkg/proxy/healthcheck"
@ -51,7 +52,6 @@ import (
proxyutiliptables "k8s.io/kubernetes/pkg/proxy/util/iptables"
utilproxytest "k8s.io/kubernetes/pkg/proxy/util/testing"
"k8s.io/kubernetes/pkg/util/async"
"k8s.io/kubernetes/pkg/util/conntrack"
utiliptables "k8s.io/kubernetes/pkg/util/iptables"
iptablestest "k8s.io/kubernetes/pkg/util/iptables/testing"
"k8s.io/utils/exec"

View File

@ -43,13 +43,13 @@ import (
"k8s.io/client-go/tools/events"
utilsysctl "k8s.io/component-helpers/node/util/sysctl"
"k8s.io/kubernetes/pkg/proxy"
"k8s.io/kubernetes/pkg/proxy/conntrack"
"k8s.io/kubernetes/pkg/proxy/healthcheck"
"k8s.io/kubernetes/pkg/proxy/metaproxier"
"k8s.io/kubernetes/pkg/proxy/metrics"
utilproxy "k8s.io/kubernetes/pkg/proxy/util"
proxyutiliptables "k8s.io/kubernetes/pkg/proxy/util/iptables"
"k8s.io/kubernetes/pkg/util/async"
"k8s.io/kubernetes/pkg/util/conntrack"
utilipset "k8s.io/kubernetes/pkg/util/ipset"
utiliptables "k8s.io/kubernetes/pkg/util/iptables"
utilipvs "k8s.io/kubernetes/pkg/util/ipvs"
@ -946,29 +946,6 @@ func (proxier *Proxier) syncProxyRules() {
serviceUpdateResult := proxier.svcPortMap.Update(proxier.serviceChanges)
endpointUpdateResult := proxier.endpointsMap.Update(proxier.endpointsChanges)
// We need to detect stale connections to UDP Services so we
// can clean dangling conntrack entries that can blackhole traffic.
conntrackCleanupServiceIPs := serviceUpdateResult.DeletedUDPClusterIPs
conntrackCleanupServiceNodePorts := sets.NewInt()
// merge stale services gathered from updateEndpointsMap
// an UDP service that changes from 0 to non-0 endpoints is considered stale.
for _, svcPortName := range endpointUpdateResult.NewlyActiveUDPServices {
if svcInfo, ok := proxier.svcPortMap[svcPortName]; ok {
klog.V(4).InfoS("Newly-active UDP service may have stale conntrack entries", "servicePortName", svcPortName)
conntrackCleanupServiceIPs.Insert(svcInfo.ClusterIP().String())
for _, extIP := range svcInfo.ExternalIPStrings() {
conntrackCleanupServiceIPs.Insert(extIP)
}
for _, lbIP := range svcInfo.LoadBalancerIPStrings() {
conntrackCleanupServiceIPs.Insert(lbIP)
}
nodePort := svcInfo.NodePort()
if svcInfo.Protocol() == v1.ProtocolUDP && nodePort != 0 {
conntrackCleanupServiceNodePorts.Insert(nodePort)
}
}
}
klog.V(3).InfoS("Syncing ipvs proxier rules")
proxier.serviceNoLocalEndpointsInternal = sets.New[string]()
@ -1528,24 +1505,8 @@ func (proxier *Proxier) syncProxyRules() {
metrics.SyncProxyRulesNoLocalEndpointsTotal.WithLabelValues("internal").Set(float64(proxier.serviceNoLocalEndpointsInternal.Len()))
metrics.SyncProxyRulesNoLocalEndpointsTotal.WithLabelValues("external").Set(float64(proxier.serviceNoLocalEndpointsExternal.Len()))
// Finish housekeeping.
// Clear stale conntrack entries for UDP Services, this has to be done AFTER the ipvs rules are programmed.
// TODO: these could be made more consistent.
klog.V(4).InfoS("Deleting conntrack stale entries for services", "IPs", conntrackCleanupServiceIPs.UnsortedList())
for _, svcIP := range conntrackCleanupServiceIPs.UnsortedList() {
if err := conntrack.ClearEntriesForIP(proxier.exec, svcIP, v1.ProtocolUDP); err != nil {
klog.ErrorS(err, "Failed to delete stale service connections", "IP", svcIP)
}
}
klog.V(4).InfoS("Deleting conntrack stale entries for services", "nodePorts", conntrackCleanupServiceNodePorts.UnsortedList())
for _, nodePort := range conntrackCleanupServiceNodePorts.UnsortedList() {
err := conntrack.ClearEntriesForPort(proxier.exec, nodePort, proxier.ipFamily == v1.IPv6Protocol, v1.ProtocolUDP)
if err != nil {
klog.ErrorS(err, "Failed to clear udp conntrack", "nodePort", nodePort)
}
}
klog.V(4).InfoS("Deleting stale endpoint connections", "endpoints", endpointUpdateResult.DeletedUDPEndpoints)
proxier.deleteUDPEndpointConnections(endpointUpdateResult.DeletedUDPEndpoints)
// Finish housekeeping, clear stale conntrack entries for UDP Services
conntrack.CleanStaleEntries(proxier.ipFamily == v1.IPv6Protocol, proxier.exec, proxier.svcPortMap, serviceUpdateResult, endpointUpdateResult)
}
// writeIptablesRules write all iptables rules to proxier.natRules or proxier.FilterRules that ipvs proxier needed
@ -1817,42 +1778,6 @@ func (proxier *Proxier) createAndLinkKubeChain() {
}
// After a UDP endpoint has been removed, we must flush any pending conntrack entries to it, or else we
// risk sending more traffic to it, all of which will be lost (because UDP).
// This assumes the proxier mutex is held
// TODO: move it to util
func (proxier *Proxier) deleteUDPEndpointConnections(deletedUDPEndpoints []proxy.ServiceEndpoint) {
for _, epSvcPair := range deletedUDPEndpoints {
if svcInfo, ok := proxier.svcPortMap[epSvcPair.ServicePortName]; ok {
endpointIP := utilproxy.IPPart(epSvcPair.Endpoint)
nodePort := svcInfo.NodePort()
var err error
if nodePort != 0 {
err = conntrack.ClearEntriesForPortNAT(proxier.exec, endpointIP, nodePort, v1.ProtocolUDP)
if err != nil {
klog.ErrorS(err, "Failed to delete nodeport-related endpoint connections", "servicePortName", epSvcPair.ServicePortName)
}
}
err = conntrack.ClearEntriesForNAT(proxier.exec, svcInfo.ClusterIP().String(), endpointIP, v1.ProtocolUDP)
if err != nil {
klog.ErrorS(err, "Failed to delete endpoint connections", "servicePortName", epSvcPair.ServicePortName)
}
for _, extIP := range svcInfo.ExternalIPStrings() {
err := conntrack.ClearEntriesForNAT(proxier.exec, extIP, endpointIP, v1.ProtocolUDP)
if err != nil {
klog.ErrorS(err, "Failed to delete endpoint connections for externalIP", "servicePortName", epSvcPair.ServicePortName, "externalIP", extIP)
}
}
for _, lbIP := range svcInfo.LoadBalancerIPStrings() {
err := conntrack.ClearEntriesForNAT(proxier.exec, lbIP, endpointIP, v1.ProtocolUDP)
if err != nil {
klog.ErrorS(err, "Failed to delete endpoint connections for LoadBalancerIP", "servicePortName", epSvcPair.ServicePortName, "loadBalancerIP", lbIP)
}
}
}
}
}
func (proxier *Proxier) syncService(svcName string, vs *utilipvs.VirtualServer, bindAddr bool, alreadyBoundAddrs sets.Set[string]) error {
appliedVirtualServer, _ := proxier.ipvs.GetVirtualServer(vs)
if appliedVirtualServer == nil || !appliedVirtualServer.Equal(vs) {

View File

@ -1,8 +0,0 @@
# See the OWNERS docs at https://go.k8s.io/owners
approvers:
- sig-network-approvers
reviewers:
- sig-network-reviewers
labels:
- sig/network