Simplify nftables/proxier.go by using string rather than utiliptables.Chain

Change the svcPortInfo and endpointInfo fields to string rather than
utiliptables.Chain, and various fixups from there.

Also use a proper set for activeNATChains, and fix the capitalization
of endpointInfo.chainName.
This commit is contained in:
Dan Winship
2023-05-18 11:02:13 -04:00
parent 96e53f64f4
commit 3abdda9800

View File

@@ -36,6 +36,7 @@ import (
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1" discovery "k8s.io/api/discovery/v1"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/events" "k8s.io/client-go/tools/events"
utilsysctl "k8s.io/component-helpers/node/util/sysctl" utilsysctl "k8s.io/component-helpers/node/util/sysctl"
@@ -55,25 +56,25 @@ import (
const ( const (
// the services chain // the services chain
kubeServicesChain utiliptables.Chain = "KUBE-SERVICES" kubeServicesChain = "KUBE-SERVICES"
// the external services chain // the external services chain
kubeExternalServicesChain utiliptables.Chain = "KUBE-EXTERNAL-SERVICES" kubeExternalServicesChain = "KUBE-EXTERNAL-SERVICES"
// the nodeports chain // the nodeports chain
kubeNodePortsChain utiliptables.Chain = "KUBE-NODEPORTS" kubeNodePortsChain = "KUBE-NODEPORTS"
// the kubernetes postrouting chain // the kubernetes postrouting chain
kubePostroutingChain utiliptables.Chain = "KUBE-POSTROUTING" kubePostroutingChain = "KUBE-POSTROUTING"
// kubeMarkMasqChain is the mark-for-masquerade chain // kubeMarkMasqChain is the mark-for-masquerade chain
kubeMarkMasqChain utiliptables.Chain = "KUBE-MARK-MASQ" kubeMarkMasqChain = "KUBE-MARK-MASQ"
// the kubernetes forward chain // the kubernetes forward chain
kubeForwardChain utiliptables.Chain = "KUBE-FORWARD" kubeForwardChain = "KUBE-FORWARD"
// kubeProxyFirewallChain is the kube-proxy firewall chain // kubeProxyFirewallChain is the kube-proxy firewall chain
kubeProxyFirewallChain utiliptables.Chain = "KUBE-PROXY-FIREWALL" kubeProxyFirewallChain = "KUBE-PROXY-FIREWALL"
) )
const sysctlNFConntrackTCPBeLiberal = "net/netfilter/nf_conntrack_tcp_be_liberal" const sysctlNFConntrackTCPBeLiberal = "net/netfilter/nf_conntrack_tcp_be_liberal"
@@ -83,10 +84,10 @@ type servicePortInfo struct {
*proxy.BaseServicePortInfo *proxy.BaseServicePortInfo
// The following fields are computed and stored for performance reasons. // The following fields are computed and stored for performance reasons.
nameString string nameString string
clusterPolicyChainName utiliptables.Chain clusterPolicyChainName string
localPolicyChainName utiliptables.Chain localPolicyChainName string
firewallChainName utiliptables.Chain firewallChainName string
externalChainName utiliptables.Chain externalChainName string
} }
// returns a new proxy.ServicePort which abstracts a serviceInfo // returns a new proxy.ServicePort which abstracts a serviceInfo
@@ -110,14 +111,14 @@ func newServiceInfo(port *v1.ServicePort, service *v1.Service, bsvcPortInfo *pro
type endpointInfo struct { type endpointInfo struct {
*proxy.BaseEndpointInfo *proxy.BaseEndpointInfo
ChainName utiliptables.Chain chainName string
} }
// returns a new proxy.Endpoint which abstracts a endpointInfo // returns a new proxy.Endpoint which abstracts a endpointInfo
func newEndpointInfo(baseInfo *proxy.BaseEndpointInfo, svcPortName *proxy.ServicePortName) proxy.Endpoint { func newEndpointInfo(baseInfo *proxy.BaseEndpointInfo, svcPortName *proxy.ServicePortName) proxy.Endpoint {
return &endpointInfo{ return &endpointInfo{
BaseEndpointInfo: baseInfo, BaseEndpointInfo: baseInfo,
ChainName: servicePortEndpointChainName(svcPortName.String(), strings.ToLower(string(svcPortName.Protocol)), baseInfo.String()), chainName: servicePortEndpointChainName(svcPortName.String(), strings.ToLower(string(svcPortName.Protocol)), baseInfo.String()),
} }
} }
@@ -627,37 +628,37 @@ const (
// servicePortPolicyClusterChain returns the name of the KUBE-SVC-XXXX chain for a service, which is the // servicePortPolicyClusterChain returns the name of the KUBE-SVC-XXXX chain for a service, which is the
// main iptables chain for that service, used for dispatching to endpoints when using `Cluster` // main iptables chain for that service, used for dispatching to endpoints when using `Cluster`
// traffic policy. // traffic policy.
func servicePortPolicyClusterChain(servicePortName string, protocol string) utiliptables.Chain { func servicePortPolicyClusterChain(servicePortName string, protocol string) string {
return utiliptables.Chain(servicePortPolicyClusterChainNamePrefix + portProtoHash(servicePortName, protocol)) return servicePortPolicyClusterChainNamePrefix + portProtoHash(servicePortName, protocol)
} }
// servicePortPolicyLocalChainName returns the name of the KUBE-SVL-XXXX chain for a service, which // servicePortPolicyLocalChainName returns the name of the KUBE-SVL-XXXX chain for a service, which
// handles dispatching to local endpoints when using `Local` traffic policy. This chain only // handles dispatching to local endpoints when using `Local` traffic policy. This chain only
// exists if the service has `Local` internal or external traffic policy. // exists if the service has `Local` internal or external traffic policy.
func servicePortPolicyLocalChainName(servicePortName string, protocol string) utiliptables.Chain { func servicePortPolicyLocalChainName(servicePortName string, protocol string) string {
return utiliptables.Chain(servicePortPolicyLocalChainNamePrefix + portProtoHash(servicePortName, protocol)) return servicePortPolicyLocalChainNamePrefix + portProtoHash(servicePortName, protocol)
} }
// serviceFirewallChainName returns the name of the KUBE-FW-XXXX chain for a service, which // serviceFirewallChainName returns the name of the KUBE-FW-XXXX chain for a service, which
// is used to implement the filtering for the LoadBalancerSourceRanges feature. // is used to implement the filtering for the LoadBalancerSourceRanges feature.
func serviceFirewallChainName(servicePortName string, protocol string) utiliptables.Chain { func serviceFirewallChainName(servicePortName string, protocol string) string {
return utiliptables.Chain(serviceFirewallChainNamePrefix + portProtoHash(servicePortName, protocol)) return serviceFirewallChainNamePrefix + portProtoHash(servicePortName, protocol)
} }
// serviceExternalChainName returns the name of the KUBE-EXT-XXXX chain for a service, which // serviceExternalChainName returns the name of the KUBE-EXT-XXXX chain for a service, which
// implements "short-circuiting" for internally-originated external-destination traffic when using // implements "short-circuiting" for internally-originated external-destination traffic when using
// `Local` external traffic policy. It forwards traffic from local sources to the KUBE-SVC-XXXX // `Local` external traffic policy. It forwards traffic from local sources to the KUBE-SVC-XXXX
// chain and traffic from external sources to the KUBE-SVL-XXXX chain. // chain and traffic from external sources to the KUBE-SVL-XXXX chain.
func serviceExternalChainName(servicePortName string, protocol string) utiliptables.Chain { func serviceExternalChainName(servicePortName string, protocol string) string {
return utiliptables.Chain(serviceExternalChainNamePrefix + portProtoHash(servicePortName, protocol)) return serviceExternalChainNamePrefix + portProtoHash(servicePortName, protocol)
} }
// servicePortEndpointChainName returns the name of the KUBE-SEP-XXXX chain for a particular // servicePortEndpointChainName returns the name of the KUBE-SEP-XXXX chain for a particular
// service endpoint. // service endpoint.
func servicePortEndpointChainName(servicePortName string, protocol string, endpoint string) utiliptables.Chain { func servicePortEndpointChainName(servicePortName string, protocol string, endpoint string) string {
hash := sha256.Sum256([]byte(servicePortName + protocol + endpoint)) hash := sha256.Sum256([]byte(servicePortName + protocol + endpoint))
encoded := base32.StdEncoding.EncodeToString(hash[:]) encoded := base32.StdEncoding.EncodeToString(hash[:])
return utiliptables.Chain(servicePortEndpointChainNamePrefix + encoded[:16]) return servicePortEndpointChainNamePrefix + encoded[:16]
} }
func isServiceChainName(chainString string) bool { func isServiceChainName(chainString string) bool {
@@ -777,7 +778,7 @@ func (proxier *Proxier) syncProxyRules() {
) )
// Accumulate NAT chains to keep. // Accumulate NAT chains to keep.
activeNATChains := map[utiliptables.Chain]bool{} // use a map as a set activeNATChains := sets.New[string]()
// Compute total number of endpoint chains across all services // Compute total number of endpoint chains across all services
// to get a sense of how big the cluster is. // to get a sense of how big the cluster is.
@@ -811,7 +812,7 @@ func (proxier *Proxier) syncProxyRules() {
// Note the endpoint chains that will be used // Note the endpoint chains that will be used
for _, ep := range allLocallyReachableEndpoints { for _, ep := range allLocallyReachableEndpoints {
if epInfo, ok := ep.(*endpointInfo); ok { if epInfo, ok := ep.(*endpointInfo); ok {
activeNATChains[epInfo.ChainName] = true activeNATChains.Insert(epInfo.chainName)
} }
} }
@@ -819,14 +820,14 @@ func (proxier *Proxier) syncProxyRules() {
clusterPolicyChain := svcInfo.clusterPolicyChainName clusterPolicyChain := svcInfo.clusterPolicyChainName
usesClusterPolicyChain := len(clusterEndpoints) > 0 && svcInfo.UsesClusterEndpoints() usesClusterPolicyChain := len(clusterEndpoints) > 0 && svcInfo.UsesClusterEndpoints()
if usesClusterPolicyChain { if usesClusterPolicyChain {
activeNATChains[clusterPolicyChain] = true activeNATChains.Insert(clusterPolicyChain)
} }
// localPolicyChain contains the endpoints used with "Local" traffic policy // localPolicyChain contains the endpoints used with "Local" traffic policy
localPolicyChain := svcInfo.localPolicyChainName localPolicyChain := svcInfo.localPolicyChainName
usesLocalPolicyChain := len(localEndpoints) > 0 && svcInfo.UsesLocalEndpoints() usesLocalPolicyChain := len(localEndpoints) > 0 && svcInfo.UsesLocalEndpoints()
if usesLocalPolicyChain { if usesLocalPolicyChain {
activeNATChains[localPolicyChain] = true activeNATChains.Insert(localPolicyChain)
} }
// internalPolicyChain is the chain containing the endpoints for // internalPolicyChain is the chain containing the endpoints for
@@ -868,7 +869,7 @@ func (proxier *Proxier) syncProxyRules() {
// are no externally-usable endpoints. // are no externally-usable endpoints.
usesExternalTrafficChain := hasEndpoints && svcInfo.ExternallyAccessible() usesExternalTrafficChain := hasEndpoints && svcInfo.ExternallyAccessible()
if usesExternalTrafficChain { if usesExternalTrafficChain {
activeNATChains[externalTrafficChain] = true activeNATChains.Insert(externalTrafficChain)
} }
// Traffic to LoadBalancer IPs can go directly to externalTrafficChain // Traffic to LoadBalancer IPs can go directly to externalTrafficChain
@@ -878,7 +879,7 @@ func (proxier *Proxier) syncProxyRules() {
fwChain := svcInfo.firewallChainName fwChain := svcInfo.firewallChainName
usesFWChain := hasEndpoints && len(svcInfo.LoadBalancerVIPStrings()) > 0 && len(svcInfo.LoadBalancerSourceRanges()) > 0 usesFWChain := hasEndpoints && len(svcInfo.LoadBalancerVIPStrings()) > 0 && len(svcInfo.LoadBalancerSourceRanges()) > 0
if usesFWChain { if usesFWChain {
activeNATChains[fwChain] = true activeNATChains.Insert(fwChain)
loadBalancerTrafficChain = fwChain loadBalancerTrafficChain = fwChain
} }
@@ -1061,7 +1062,7 @@ func (proxier *Proxier) syncProxyRules() {
// jump to externalTrafficChain, which will handle some special cases and // jump to externalTrafficChain, which will handle some special cases and
// then jump to externalPolicyChain. // then jump to externalPolicyChain.
if usesExternalTrafficChain { if usesExternalTrafficChain {
proxier.natChains.Write(utiliptables.MakeChainLine(externalTrafficChain)) proxier.natChains.Write(utiliptables.MakeChainLine(utiliptables.Chain(externalTrafficChain)))
if !svcInfo.ExternalPolicyLocal() { if !svcInfo.ExternalPolicyLocal() {
// If we are using non-local endpoints we need to masquerade, // If we are using non-local endpoints we need to masquerade,
@@ -1115,7 +1116,7 @@ func (proxier *Proxier) syncProxyRules() {
// Set up firewall chain, if needed // Set up firewall chain, if needed
if usesFWChain { if usesFWChain {
proxier.natChains.Write(utiliptables.MakeChainLine(fwChain)) proxier.natChains.Write(utiliptables.MakeChainLine(utiliptables.Chain(fwChain)))
// The service firewall rules are created based on the // The service firewall rules are created based on the
// loadBalancerSourceRanges field. This only works for VIP-like // loadBalancerSourceRanges field. This only works for VIP-like
@@ -1166,14 +1167,14 @@ func (proxier *Proxier) syncProxyRules() {
// If Cluster policy is in use, create the chain and create rules jumping // If Cluster policy is in use, create the chain and create rules jumping
// from clusterPolicyChain to the clusterEndpoints // from clusterPolicyChain to the clusterEndpoints
if usesClusterPolicyChain { if usesClusterPolicyChain {
proxier.natChains.Write(utiliptables.MakeChainLine(clusterPolicyChain)) proxier.natChains.Write(utiliptables.MakeChainLine(utiliptables.Chain(clusterPolicyChain)))
proxier.writeServiceToEndpointRules(svcPortNameString, svcInfo, clusterPolicyChain, clusterEndpoints) proxier.writeServiceToEndpointRules(svcPortNameString, svcInfo, clusterPolicyChain, clusterEndpoints)
} }
// If Local policy is in use, create the chain and create rules jumping // If Local policy is in use, create the chain and create rules jumping
// from localPolicyChain to the localEndpoints // from localPolicyChain to the localEndpoints
if usesLocalPolicyChain { if usesLocalPolicyChain {
proxier.natChains.Write(utiliptables.MakeChainLine(localPolicyChain)) proxier.natChains.Write(utiliptables.MakeChainLine(utiliptables.Chain(localPolicyChain)))
proxier.writeServiceToEndpointRules(svcPortNameString, svcInfo, localPolicyChain, localEndpoints) proxier.writeServiceToEndpointRules(svcPortNameString, svcInfo, localPolicyChain, localEndpoints)
} }
@@ -1185,11 +1186,11 @@ func (proxier *Proxier) syncProxyRules() {
continue continue
} }
endpointChain := epInfo.ChainName endpointChain := epInfo.chainName
// Create the endpoint chain // Create the endpoint chain
proxier.natChains.Write(utiliptables.MakeChainLine(endpointChain)) proxier.natChains.Write(utiliptables.MakeChainLine(utiliptables.Chain(endpointChain)))
activeNATChains[endpointChain] = true activeNATChains.Insert(endpointChain)
// Handle traffic that loops back to the originator with SNAT. // Handle traffic that loops back to the originator with SNAT.
proxier.natRules.Write( proxier.natRules.Write(
@@ -1221,7 +1222,7 @@ func (proxier *Proxier) syncProxyRules() {
existingNATChains = utiliptables.GetChainsFromTable(proxier.iptablesData.Bytes()) existingNATChains = utiliptables.GetChainsFromTable(proxier.iptablesData.Bytes())
for chain := range existingNATChains { for chain := range existingNATChains {
if !activeNATChains[chain] { if !activeNATChains.Has(string(chain)) {
chainString := string(chain) chainString := string(chain)
if !isServiceChainName(chainString) { if !isServiceChainName(chainString) {
// Ignore chains that aren't ours. // Ignore chains that aren't ours.
@@ -1357,7 +1358,7 @@ func (proxier *Proxier) syncProxyRules() {
conntrack.CleanStaleEntries(proxier.iptables.IsIPv6(), proxier.exec, proxier.svcPortMap, serviceUpdateResult, endpointUpdateResult) conntrack.CleanStaleEntries(proxier.iptables.IsIPv6(), proxier.exec, proxier.svcPortMap, serviceUpdateResult, endpointUpdateResult)
} }
func (proxier *Proxier) writeServiceToEndpointRules(svcPortNameString string, svcInfo proxy.ServicePort, svcChain utiliptables.Chain, endpoints []proxy.Endpoint) { func (proxier *Proxier) writeServiceToEndpointRules(svcPortNameString string, svcInfo proxy.ServicePort, svcChain string, endpoints []proxy.Endpoint) {
// First write session affinity rules, if applicable. // First write session affinity rules, if applicable.
if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP { if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {
for _, ep := range endpoints { for _, ep := range endpoints {
@@ -1369,9 +1370,9 @@ func (proxier *Proxier) writeServiceToEndpointRules(svcPortNameString string, sv
proxier.natRules.Write( proxier.natRules.Write(
"-A", string(svcChain), "-A", string(svcChain),
"-m", "comment", "--comment", comment, "-m", "comment", "--comment", comment,
"-m", "recent", "--name", string(epInfo.ChainName), "-m", "recent", "--name", string(epInfo.chainName),
"--rcheck", "--seconds", strconv.Itoa(svcInfo.StickyMaxAgeSeconds()), "--reap", "--rcheck", "--seconds", strconv.Itoa(svcInfo.StickyMaxAgeSeconds()), "--reap",
"-j", string(epInfo.ChainName), "-j", string(epInfo.chainName),
) )
} }
} }
@@ -1393,14 +1394,14 @@ func (proxier *Proxier) writeServiceToEndpointRules(svcPortNameString string, sv
"-m", "statistic", "-m", "statistic",
"--mode", "random", "--mode", "random",
"--probability", proxier.probability(numEndpoints-i), "--probability", proxier.probability(numEndpoints-i),
"-j", string(epInfo.ChainName), "-j", string(epInfo.chainName),
) )
} else { } else {
// The final (or only if n == 1) rule is a guaranteed match. // The final (or only if n == 1) rule is a guaranteed match.
proxier.natRules.Write( proxier.natRules.Write(
"-A", string(svcChain), "-A", string(svcChain),
"-m", "comment", "--comment", comment, "-m", "comment", "--comment", comment,
"-j", string(epInfo.ChainName), "-j", string(epInfo.chainName),
) )
} }
} }