mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-31 15:25:57 +00:00
Port service/endpoint chain creation/cleanup to nftables
This commit is contained in:
parent
2735ad541e
commit
6cff415305
@ -21,7 +21,6 @@ package nftables
|
||||
//
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"crypto/sha256"
|
||||
"encoding/base32"
|
||||
@ -103,12 +102,13 @@ func newServiceInfo(port *v1.ServicePort, service *v1.Service, bsvcPortInfo *pro
|
||||
// Store the following for performance reasons.
|
||||
svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
|
||||
svcPortName := proxy.ServicePortName{NamespacedName: svcName, Port: port.Name}
|
||||
protocol := strings.ToLower(string(svcPort.Protocol()))
|
||||
svcPort.nameString = svcPortName.String()
|
||||
svcPort.clusterPolicyChainName = servicePortPolicyClusterChain(svcPort.nameString, protocol)
|
||||
svcPort.localPolicyChainName = servicePortPolicyLocalChainName(svcPort.nameString, protocol)
|
||||
svcPort.firewallChainName = serviceFirewallChainName(svcPort.nameString, protocol)
|
||||
svcPort.externalChainName = serviceExternalChainName(svcPort.nameString, protocol)
|
||||
|
||||
chainNameBase := servicePortChainNameBase(&svcPortName, strings.ToLower(string(svcPort.Protocol())))
|
||||
svcPort.clusterPolicyChainName = servicePortPolicyClusterChainNamePrefix + chainNameBase
|
||||
svcPort.localPolicyChainName = servicePortPolicyLocalChainNamePrefix + chainNameBase
|
||||
svcPort.firewallChainName = serviceFirewallChainNamePrefix + chainNameBase
|
||||
svcPort.externalChainName = serviceExternalChainNamePrefix + chainNameBase
|
||||
|
||||
return svcPort
|
||||
}
|
||||
@ -124,7 +124,7 @@ type endpointInfo struct {
|
||||
func newEndpointInfo(baseInfo *proxy.BaseEndpointInfo, svcPortName *proxy.ServicePortName) proxy.Endpoint {
|
||||
return &endpointInfo{
|
||||
BaseEndpointInfo: baseInfo,
|
||||
chainName: servicePortEndpointChainName(svcPortName.String(), strings.ToLower(string(svcPortName.Protocol)), baseInfo.String()),
|
||||
chainName: servicePortEndpointChainNamePrefix + servicePortEndpointChainNameBase(svcPortName, strings.ToLower(string(svcPortName.Protocol)), baseInfo.String()),
|
||||
}
|
||||
}
|
||||
|
||||
@ -174,11 +174,8 @@ type Proxier struct {
|
||||
|
||||
// The following buffers are used to reuse memory and avoid allocations
|
||||
// that are significantly impacting performance.
|
||||
iptablesData *bytes.Buffer
|
||||
filterChains proxyutil.LineBuffer
|
||||
filterRules proxyutil.LineBuffer
|
||||
natChains proxyutil.LineBuffer
|
||||
natRules proxyutil.LineBuffer
|
||||
filterRules proxyutil.LineBuffer
|
||||
natRules proxyutil.LineBuffer
|
||||
|
||||
// conntrackTCPLiberal indicates whether the system sets the kernel nf_conntrack_tcp_be_liberal
|
||||
conntrackTCPLiberal bool
|
||||
@ -188,6 +185,9 @@ type Proxier struct {
|
||||
// networkInterfacer defines an interface for several net library functions.
|
||||
// Inject for test purpose.
|
||||
networkInterfacer proxyutil.NetworkInterfacer
|
||||
|
||||
// staleChains contains information about chains to be deleted later
|
||||
staleChains map[string]time.Time
|
||||
}
|
||||
|
||||
// Proxier implements proxy.Provider
|
||||
@ -263,14 +263,12 @@ func NewProxier(ipFamily v1.IPFamily,
|
||||
serviceHealthServer: serviceHealthServer,
|
||||
healthzServer: healthzServer,
|
||||
precomputedProbabilities: make([]string, 0, 1001),
|
||||
iptablesData: bytes.NewBuffer(nil),
|
||||
filterChains: proxyutil.NewLineBuffer(),
|
||||
filterRules: proxyutil.NewLineBuffer(),
|
||||
natChains: proxyutil.NewLineBuffer(),
|
||||
natRules: proxyutil.NewLineBuffer(),
|
||||
nodePortAddresses: nodePortAddresses,
|
||||
networkInterfacer: proxyutil.RealNetwork{},
|
||||
conntrackTCPLiberal: conntrackTCPLiberal,
|
||||
staleChains: make(map[string]time.Time),
|
||||
}
|
||||
|
||||
burstSyncs := 2
|
||||
@ -667,75 +665,113 @@ func (proxier *Proxier) OnNodeDelete(node *v1.Node) {
|
||||
func (proxier *Proxier) OnNodeSynced() {
|
||||
}
|
||||
|
||||
// portProtoHash takes the ServicePortName and protocol for a service
|
||||
// returns the associated 16 character hash. This is computed by hashing (sha256)
|
||||
// then encoding to base32 and truncating to 16 chars. We do this because IPTables
|
||||
// Chain Names must be <= 28 chars long, and the longer they are the harder they are to read.
|
||||
func portProtoHash(servicePortName string, protocol string) string {
|
||||
hash := sha256.Sum256([]byte(servicePortName + protocol))
|
||||
encoded := base32.StdEncoding.EncodeToString(hash[:])
|
||||
return encoded[:16]
|
||||
}
|
||||
|
||||
const (
|
||||
servicePortPolicyClusterChainNamePrefix = "KUBE-SVC-"
|
||||
servicePortPolicyLocalChainNamePrefix = "KUBE-SVL-"
|
||||
serviceFirewallChainNamePrefix = "KUBE-FW-"
|
||||
serviceExternalChainNamePrefix = "KUBE-EXT-"
|
||||
servicePortEndpointChainNamePrefix = "KUBE-SEP-"
|
||||
// Maximum length for one of our chain name prefixes, including the trailing
|
||||
// hyphen.
|
||||
chainNamePrefixLengthMax = 16
|
||||
|
||||
// Maximum length of the string returned from servicePortChainNameBase or
|
||||
// servicePortEndpointChainNameBase.
|
||||
chainNameBaseLengthMax = knftables.NameLengthMax - chainNamePrefixLengthMax
|
||||
)
|
||||
|
||||
// servicePortPolicyClusterChain returns the name of the KUBE-SVC-XXXX chain for a service, which is the
|
||||
// main iptables chain for that service, used for dispatching to endpoints when using `Cluster`
|
||||
// traffic policy.
|
||||
func servicePortPolicyClusterChain(servicePortName string, protocol string) string {
|
||||
return servicePortPolicyClusterChainNamePrefix + portProtoHash(servicePortName, protocol)
|
||||
}
|
||||
const (
|
||||
servicePortPolicyClusterChainNamePrefix = "service-"
|
||||
servicePortPolicyLocalChainNamePrefix = "local-"
|
||||
serviceFirewallChainNamePrefix = "firewall-"
|
||||
serviceExternalChainNamePrefix = "external-"
|
||||
servicePortEndpointChainNamePrefix = "endpoint-"
|
||||
)
|
||||
|
||||
// servicePortPolicyLocalChainName returns the name of the KUBE-SVL-XXXX chain for a service, which
|
||||
// handles dispatching to local endpoints when using `Local` traffic policy. This chain only
|
||||
// exists if the service has `Local` internal or external traffic policy.
|
||||
func servicePortPolicyLocalChainName(servicePortName string, protocol string) string {
|
||||
return servicePortPolicyLocalChainNamePrefix + portProtoHash(servicePortName, protocol)
|
||||
}
|
||||
|
||||
// serviceFirewallChainName returns the name of the KUBE-FW-XXXX chain for a service, which
|
||||
// is used to implement the filtering for the LoadBalancerSourceRanges feature.
|
||||
func serviceFirewallChainName(servicePortName string, protocol string) string {
|
||||
return serviceFirewallChainNamePrefix + portProtoHash(servicePortName, protocol)
|
||||
}
|
||||
|
||||
// serviceExternalChainName returns the name of the KUBE-EXT-XXXX chain for a service, which
|
||||
// implements "short-circuiting" for internally-originated external-destination traffic when using
|
||||
// `Local` external traffic policy. It forwards traffic from local sources to the KUBE-SVC-XXXX
|
||||
// chain and traffic from external sources to the KUBE-SVL-XXXX chain.
|
||||
func serviceExternalChainName(servicePortName string, protocol string) string {
|
||||
return serviceExternalChainNamePrefix + portProtoHash(servicePortName, protocol)
|
||||
}
|
||||
|
||||
// servicePortEndpointChainName returns the name of the KUBE-SEP-XXXX chain for a particular
|
||||
// service endpoint.
|
||||
func servicePortEndpointChainName(servicePortName string, protocol string, endpoint string) string {
|
||||
hash := sha256.Sum256([]byte(servicePortName + protocol + endpoint))
|
||||
// hashAndTruncate prefixes name with a hash of itself and then truncates to
|
||||
// chainNameBaseLengthMax. The hash ensures that (a) the name is still unique if we have
|
||||
// to truncate the end, and (b) it's visually distinguishable from other chains that would
|
||||
// otherwise have nearly identical names (e.g., different endpoint chains for a given
|
||||
// service that differ in only a single digit).
|
||||
func hashAndTruncate(name string) string {
|
||||
hash := sha256.Sum256([]byte(name))
|
||||
encoded := base32.StdEncoding.EncodeToString(hash[:])
|
||||
return servicePortEndpointChainNamePrefix + encoded[:16]
|
||||
name = encoded[:8] + "-" + name
|
||||
if len(name) > chainNameBaseLengthMax {
|
||||
name = name[:chainNameBaseLengthMax-3] + "..."
|
||||
}
|
||||
return name
|
||||
}
|
||||
|
||||
// servicePortChainNameBase returns the base name for a chain for the given ServicePort.
|
||||
// This is something like "HASH-namespace/serviceName/protocol/portName", e.g,
|
||||
// "ULMVA6XW-ns1/svc1/tcp/p80".
|
||||
func servicePortChainNameBase(servicePortName *proxy.ServicePortName, protocol string) string {
|
||||
// nftables chains can contain the characters [A-Za-z0-9_./-] (but must start with
|
||||
// a letter, underscore, or dot).
|
||||
//
|
||||
// Namespace, Service, and Port names can contain [a-z0-9-] (with some additional
|
||||
// restrictions that aren't relevant here).
|
||||
//
|
||||
// Protocol is /(tcp|udp|sctp)/.
|
||||
//
|
||||
// Thus, we can safely use all Namespace names, Service names, protocol values,
|
||||
// and Port names directly in nftables chain names (though note that this assumes
|
||||
// that the chain name won't *start* with any of those strings, since that might
|
||||
// be illegal). We use "/" to separate the parts of the name, which is one of the
|
||||
// two characters allowed in a chain name that isn't allowed in our input strings.
|
||||
|
||||
name := fmt.Sprintf("%s/%s/%s/%s",
|
||||
servicePortName.NamespacedName.Namespace,
|
||||
servicePortName.NamespacedName.Name,
|
||||
protocol,
|
||||
servicePortName.Port,
|
||||
)
|
||||
|
||||
// The namespace, service, and port name can each be up to 63 characters, protocol
|
||||
// can be up to 4, plus 8 for the hash and 4 additional punctuation characters.
|
||||
// That's a total of 205, which is less than chainNameBaseLengthMax (240). So this
|
||||
// will never actually return a truncated name.
|
||||
return hashAndTruncate(name)
|
||||
}
|
||||
|
||||
// servicePortEndpointChainNameBase returns the suffix for chain names for the given
|
||||
// endpoint. This is something like
|
||||
// "HASH-namespace/serviceName/protocol/portName__endpointIP/endpointport", e.g.,
|
||||
// "5OJB2KTY-ns1/svc1/tcp/p80__10.180.0.1/80".
|
||||
func servicePortEndpointChainNameBase(servicePortName *proxy.ServicePortName, protocol, endpoint string) string {
|
||||
// As above in servicePortChainNameBase: Namespace, Service, Port, Protocol, and
|
||||
// EndpointPort are all safe to copy into the chain name directly. But if
|
||||
// EndpointIP is IPv6 then it will contain colons, which aren't allowed in a chain
|
||||
// name. IPv6 IPs are also quite long, but we can't safely truncate them (e.g. to
|
||||
// only the final segment) because (especially for manually-created external
|
||||
// endpoints), we can't know for sure that any part of them is redundant.
|
||||
|
||||
endpointIP, endpointPort, _ := net.SplitHostPort(endpoint)
|
||||
if strings.Contains(endpointIP, ":") {
|
||||
endpointIP = strings.ReplaceAll(endpointIP, ":", ".")
|
||||
}
|
||||
|
||||
// As above, we use "/" to separate parts of the name, and "__" to separate the
|
||||
// "service" part from the "endpoint" part.
|
||||
name := fmt.Sprintf("%s/%s/%s/%s__%s/%s",
|
||||
servicePortName.NamespacedName.Namespace,
|
||||
servicePortName.NamespacedName.Name,
|
||||
protocol,
|
||||
servicePortName.Port,
|
||||
endpointIP,
|
||||
endpointPort,
|
||||
)
|
||||
|
||||
// The part of name before the "__" can be up to 205 characters (as with
|
||||
// servicePortChainNameBase above). An IPv6 address can be up to 39 characters, and
|
||||
// a port can be up to 5 digits, plus 3 punctuation characters gives a max total
|
||||
// length of 252, well over chainNameBaseLengthMax (240), so truncation is
|
||||
// theoretically possible (though incredibly unlikely).
|
||||
return hashAndTruncate(name)
|
||||
}
|
||||
|
||||
func isServiceChainName(chainString string) bool {
|
||||
prefixes := []string{
|
||||
servicePortPolicyClusterChainNamePrefix,
|
||||
servicePortPolicyLocalChainNamePrefix,
|
||||
servicePortEndpointChainNamePrefix,
|
||||
serviceFirewallChainNamePrefix,
|
||||
serviceExternalChainNamePrefix,
|
||||
}
|
||||
|
||||
for _, p := range prefixes {
|
||||
if strings.HasPrefix(chainString, p) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
// The chains returned from servicePortChainNameBase and
|
||||
// servicePortEndpointChainNameBase will always have at least one "/" in them.
|
||||
// Since none of our "stock" chain names use slashes, we can distinguish them this
|
||||
// way.
|
||||
return strings.Contains(chainString, "/")
|
||||
}
|
||||
|
||||
// This is where all of the nftables calls happen.
|
||||
@ -774,18 +810,45 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
}
|
||||
}()
|
||||
|
||||
// If there are sufficiently-stale chains left over from previous transactions,
|
||||
// try to delete them now.
|
||||
if len(proxier.staleChains) > 0 {
|
||||
oneSecondAgo := start.Add(-time.Second)
|
||||
tx := proxier.nftables.NewTransaction()
|
||||
deleted := 0
|
||||
for chain, modtime := range proxier.staleChains {
|
||||
if modtime.Before(oneSecondAgo) {
|
||||
tx.Delete(&knftables.Chain{
|
||||
Name: chain,
|
||||
})
|
||||
delete(proxier.staleChains, chain)
|
||||
deleted++
|
||||
}
|
||||
}
|
||||
if deleted > 0 {
|
||||
klog.InfoS("Deleting stale nftables chains", "numChains", deleted)
|
||||
err := proxier.nftables.Run(context.TODO(), tx)
|
||||
if err != nil {
|
||||
// We already deleted the entries from staleChains, but if
|
||||
// the chains still exist, they'll just get added back
|
||||
// (with a later timestamp) at the end of the sync.
|
||||
klog.ErrorS(err, "Unable to delete stale chains; will retry later")
|
||||
// FIXME: metric
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Now start the actual syncing transaction
|
||||
tx := proxier.nftables.NewTransaction()
|
||||
proxier.setupNFTables(tx)
|
||||
|
||||
// Reset all buffers used later.
|
||||
// This is to avoid memory reallocations and thus improve performance.
|
||||
proxier.filterChains.Reset()
|
||||
proxier.filterRules.Reset()
|
||||
proxier.natChains.Reset()
|
||||
proxier.natRules.Reset()
|
||||
|
||||
// Accumulate NAT chains to keep.
|
||||
activeNATChains := sets.New[string]()
|
||||
// Accumulate service/endpoint chains to keep.
|
||||
activeChains := sets.New[string]()
|
||||
|
||||
// Compute total number of endpoint chains across all services
|
||||
// to get a sense of how big the cluster is.
|
||||
@ -819,7 +882,7 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
// Note the endpoint chains that will be used
|
||||
for _, ep := range allLocallyReachableEndpoints {
|
||||
if epInfo, ok := ep.(*endpointInfo); ok {
|
||||
activeNATChains.Insert(epInfo.chainName)
|
||||
ensureChain(epInfo.chainName, tx, activeChains)
|
||||
}
|
||||
}
|
||||
|
||||
@ -827,14 +890,14 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
clusterPolicyChain := svcInfo.clusterPolicyChainName
|
||||
usesClusterPolicyChain := len(clusterEndpoints) > 0 && svcInfo.UsesClusterEndpoints()
|
||||
if usesClusterPolicyChain {
|
||||
activeNATChains.Insert(clusterPolicyChain)
|
||||
ensureChain(clusterPolicyChain, tx, activeChains)
|
||||
}
|
||||
|
||||
// localPolicyChain contains the endpoints used with "Local" traffic policy
|
||||
localPolicyChain := svcInfo.localPolicyChainName
|
||||
usesLocalPolicyChain := len(localEndpoints) > 0 && svcInfo.UsesLocalEndpoints()
|
||||
if usesLocalPolicyChain {
|
||||
activeNATChains.Insert(localPolicyChain)
|
||||
ensureChain(localPolicyChain, tx, activeChains)
|
||||
}
|
||||
|
||||
// internalPolicyChain is the chain containing the endpoints for
|
||||
@ -876,7 +939,7 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
// are no externally-usable endpoints.
|
||||
usesExternalTrafficChain := hasEndpoints && svcInfo.ExternallyAccessible()
|
||||
if usesExternalTrafficChain {
|
||||
activeNATChains.Insert(externalTrafficChain)
|
||||
ensureChain(externalTrafficChain, tx, activeChains)
|
||||
}
|
||||
|
||||
// Traffic to LoadBalancer IPs can go directly to externalTrafficChain
|
||||
@ -886,7 +949,7 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
fwChain := svcInfo.firewallChainName
|
||||
usesFWChain := hasEndpoints && len(svcInfo.LoadBalancerVIPStrings()) > 0 && len(svcInfo.LoadBalancerSourceRanges()) > 0
|
||||
if usesFWChain {
|
||||
activeNATChains.Insert(fwChain)
|
||||
ensureChain(fwChain, tx, activeChains)
|
||||
loadBalancerTrafficChain = fwChain
|
||||
}
|
||||
|
||||
@ -1069,8 +1132,6 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
// jump to externalTrafficChain, which will handle some special cases and
|
||||
// then jump to externalPolicyChain.
|
||||
if usesExternalTrafficChain {
|
||||
proxier.natChains.Write(utiliptables.MakeChainLine(utiliptables.Chain(externalTrafficChain)))
|
||||
|
||||
if !svcInfo.ExternalPolicyLocal() {
|
||||
// If we are using non-local endpoints we need to masquerade,
|
||||
// in case we cross nodes.
|
||||
@ -1123,8 +1184,6 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
|
||||
// Set up firewall chain, if needed
|
||||
if usesFWChain {
|
||||
proxier.natChains.Write(utiliptables.MakeChainLine(utiliptables.Chain(fwChain)))
|
||||
|
||||
// The service firewall rules are created based on the
|
||||
// loadBalancerSourceRanges field. This only works for VIP-like
|
||||
// loadbalancers that preserve source IPs. For loadbalancers which
|
||||
@ -1171,17 +1230,15 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
)
|
||||
}
|
||||
|
||||
// If Cluster policy is in use, create the chain and create rules jumping
|
||||
// from clusterPolicyChain to the clusterEndpoints
|
||||
// If Cluster policy is in use, create rules jumping from
|
||||
// clusterPolicyChain to the clusterEndpoints
|
||||
if usesClusterPolicyChain {
|
||||
proxier.natChains.Write(utiliptables.MakeChainLine(utiliptables.Chain(clusterPolicyChain)))
|
||||
proxier.writeServiceToEndpointRules(svcPortNameString, svcInfo, clusterPolicyChain, clusterEndpoints)
|
||||
}
|
||||
|
||||
// If Local policy is in use, create the chain and create rules jumping
|
||||
// from localPolicyChain to the localEndpoints
|
||||
// If Local policy is in use, create rules jumping from localPolicyChain
|
||||
// to the localEndpoints
|
||||
if usesLocalPolicyChain {
|
||||
proxier.natChains.Write(utiliptables.MakeChainLine(utiliptables.Chain(localPolicyChain)))
|
||||
proxier.writeServiceToEndpointRules(svcPortNameString, svcInfo, localPolicyChain, localEndpoints)
|
||||
}
|
||||
|
||||
@ -1195,10 +1252,6 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
|
||||
endpointChain := epInfo.chainName
|
||||
|
||||
// Create the endpoint chain
|
||||
proxier.natChains.Write(utiliptables.MakeChainLine(utiliptables.Chain(endpointChain)))
|
||||
activeNATChains.Insert(endpointChain)
|
||||
|
||||
// Handle traffic that loops back to the originator with SNAT.
|
||||
proxier.natRules.Write(
|
||||
"-A", string(endpointChain),
|
||||
@ -1220,33 +1273,6 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
}
|
||||
}
|
||||
|
||||
// Delete chains no longer in use.
|
||||
deletedChains := 0
|
||||
var existingNATChains map[utiliptables.Chain]struct{}
|
||||
|
||||
proxier.iptablesData.Reset()
|
||||
if err := proxier.iptables.SaveInto(utiliptables.TableNAT, proxier.iptablesData); err == nil {
|
||||
existingNATChains = utiliptables.GetChainsFromTable(proxier.iptablesData.Bytes())
|
||||
|
||||
for chain := range existingNATChains {
|
||||
if !activeNATChains.Has(string(chain)) {
|
||||
chainString := string(chain)
|
||||
if !isServiceChainName(chainString) {
|
||||
// Ignore chains that aren't ours.
|
||||
continue
|
||||
}
|
||||
// We must (as per iptables) write a chain-line
|
||||
// for it, which has the nice effect of flushing
|
||||
// the chain. Then we can remove the chain.
|
||||
proxier.natChains.Write(utiliptables.MakeChainLine(chain))
|
||||
proxier.natRules.Write("-X", chainString)
|
||||
deletedChains++
|
||||
}
|
||||
}
|
||||
} else {
|
||||
klog.ErrorS(err, "Failed to execute iptables-save: stale chains will not be deleted")
|
||||
}
|
||||
|
||||
// Finally, tail-call to the nodePorts chain. This needs to be after all
|
||||
// other service portal rules.
|
||||
if proxier.nodePortAddresses.MatchAll() {
|
||||
@ -1285,23 +1311,41 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
}
|
||||
}
|
||||
|
||||
// Figure out which chains are now stale. Unfortunately, we can't delete them
|
||||
// right away, because with kernels before 6.2, if there is a map element pointing
|
||||
// to a chain, and you delete that map element, the kernel doesn't notice until a
|
||||
// short amount of time later that the chain is now unreferenced. So we flush them
|
||||
// now, and record the time that they become stale in staleChains so they can be
|
||||
// deleted later.
|
||||
existingChains, err := proxier.nftables.List(context.TODO(), "chains")
|
||||
if err == nil {
|
||||
for _, chain := range existingChains {
|
||||
if isServiceChainName(chain) && !activeChains.Has(chain) {
|
||||
tx.Flush(&knftables.Chain{
|
||||
Name: chain,
|
||||
})
|
||||
proxier.staleChains[chain] = start
|
||||
}
|
||||
}
|
||||
} else if !knftables.IsNotFound(err) {
|
||||
klog.ErrorS(err, "Failed to list nftables chains: stale chains will not be deleted")
|
||||
}
|
||||
|
||||
metrics.IptablesRulesTotal.WithLabelValues(string(utiliptables.TableFilter)).Set(float64(proxier.filterRules.Lines()))
|
||||
metrics.IptablesRulesTotal.WithLabelValues(string(utiliptables.TableNAT)).Set(float64(proxier.natRules.Lines() - deletedChains))
|
||||
metrics.IptablesRulesTotal.WithLabelValues(string(utiliptables.TableNAT)).Set(float64(proxier.natRules.Lines()))
|
||||
|
||||
// Sync rules.
|
||||
klog.V(2).InfoS("Reloading service nftables data",
|
||||
"numServices", len(proxier.svcPortMap),
|
||||
"numEndpoints", totalEndpoints,
|
||||
"numFilterChains", proxier.filterChains.Lines(),
|
||||
"numFilterRules", proxier.filterRules.Lines(),
|
||||
"numNATChains", proxier.natChains.Lines(),
|
||||
"numNATRules", proxier.natRules.Lines(),
|
||||
)
|
||||
|
||||
// FIXME
|
||||
// klog.V(9).InfoS("Running nftables transaction", "transaction", tx.Bytes())
|
||||
|
||||
err := proxier.nftables.Run(context.TODO(), tx)
|
||||
err = proxier.nftables.Run(context.TODO(), tx)
|
||||
if err != nil {
|
||||
klog.ErrorS(err, "nftables sync failed")
|
||||
metrics.IptablesRestoreFailuresTotal.Inc()
|
||||
|
@ -17,7 +17,6 @@ limitations under the License.
|
||||
package nftables
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"net"
|
||||
"reflect"
|
||||
@ -322,14 +321,12 @@ func NewFakeProxier(ipFamily v1.IPFamily) (*knftables.Fake, *Proxier) {
|
||||
hostname: testHostname,
|
||||
serviceHealthServer: healthcheck.NewFakeServiceHealthServer(),
|
||||
precomputedProbabilities: make([]string, 0, 1001),
|
||||
iptablesData: bytes.NewBuffer(nil),
|
||||
filterChains: proxyutil.NewLineBuffer(),
|
||||
filterRules: proxyutil.NewLineBuffer(),
|
||||
natChains: proxyutil.NewLineBuffer(),
|
||||
natRules: proxyutil.NewLineBuffer(),
|
||||
nodeIP: netutils.ParseIPSloppy(testNodeIP),
|
||||
nodePortAddresses: proxyutil.NewNodePortAddresses(ipFamily, nil),
|
||||
networkInterfacer: networkInterfacer,
|
||||
staleChains: make(map[string]time.Time),
|
||||
}
|
||||
p.setInitialized(true)
|
||||
p.syncRunner = async.NewBoundedFrequencyRunner("test-sync-runner", p.syncProxyRules, 0, time.Minute, 1)
|
||||
@ -534,6 +531,32 @@ func TestOverallNFTablesRules(t *testing.T) {
|
||||
add rule ip kube-proxy nat-postrouting jump masquerading
|
||||
add chain ip kube-proxy nat-prerouting { type nat hook prerouting priority -100 ; }
|
||||
add rule ip kube-proxy nat-prerouting jump services
|
||||
|
||||
# svc1
|
||||
add chain ip kube-proxy service-ULMVA6XW-ns1/svc1/tcp/p80
|
||||
add chain ip kube-proxy endpoint-5OJB2KTY-ns1/svc1/tcp/p80__10.180.0.1/80
|
||||
|
||||
# svc2
|
||||
add chain ip kube-proxy service-42NFTM6N-ns2/svc2/tcp/p80
|
||||
add chain ip kube-proxy external-42NFTM6N-ns2/svc2/tcp/p80
|
||||
add chain ip kube-proxy endpoint-SGOXE6O3-ns2/svc2/tcp/p80__10.180.0.2/80
|
||||
|
||||
# svc3
|
||||
add chain ip kube-proxy service-4AT6LBPK-ns3/svc3/tcp/p80
|
||||
add chain ip kube-proxy external-4AT6LBPK-ns3/svc3/tcp/p80
|
||||
add chain ip kube-proxy endpoint-UEIP74TE-ns3/svc3/tcp/p80__10.180.0.3/80
|
||||
|
||||
# svc4
|
||||
add chain ip kube-proxy service-LAUZTJTB-ns4/svc4/tcp/p80
|
||||
add chain ip kube-proxy external-LAUZTJTB-ns4/svc4/tcp/p80
|
||||
add chain ip kube-proxy endpoint-5RFCDDV7-ns4/svc4/tcp/p80__10.180.0.5/80
|
||||
add chain ip kube-proxy endpoint-UNZV3OEC-ns4/svc4/tcp/p80__10.180.0.4/80
|
||||
|
||||
# svc5
|
||||
add chain ip kube-proxy service-HVFWP5L3-ns5/svc5/tcp/p80
|
||||
add chain ip kube-proxy external-HVFWP5L3-ns5/svc5/tcp/p80
|
||||
add chain ip kube-proxy firewall-HVFWP5L3-ns5/svc5/tcp/p80
|
||||
add chain ip kube-proxy endpoint-GTK6MW7G-ns5/svc5/tcp/p80__10.180.0.3/80
|
||||
`)
|
||||
|
||||
assertNFTablesTransactionEqual(t, getLine(), expected, nft.Dump())
|
||||
@ -4229,6 +4252,14 @@ func TestSyncProxyRulesRepeated(t *testing.T) {
|
||||
add rule ip kube-proxy nat-prerouting jump services
|
||||
`)
|
||||
|
||||
// Helper function to make it look like time has passed (from the point of view of
|
||||
// the stale-chain-deletion code).
|
||||
ageStaleChains := func() {
|
||||
for chain, t := range fp.staleChains {
|
||||
fp.staleChains[chain] = t.Add(-2 * time.Second)
|
||||
}
|
||||
}
|
||||
|
||||
// Create initial state
|
||||
var svc2 *v1.Service
|
||||
|
||||
@ -4282,8 +4313,12 @@ func TestSyncProxyRulesRepeated(t *testing.T) {
|
||||
fp.syncProxyRules()
|
||||
|
||||
expected := baseRules + dedent.Dedent(`
|
||||
# FIXME
|
||||
`)
|
||||
add chain ip kube-proxy service-ULMVA6XW-ns1/svc1/tcp/p80
|
||||
add chain ip kube-proxy endpoint-5TPGNJF2-ns1/svc1/tcp/p80__10.0.1.1/80
|
||||
|
||||
add chain ip kube-proxy service-MHHHYRWA-ns2/svc2/tcp/p8080
|
||||
add chain ip kube-proxy endpoint-7RVP4LUQ-ns2/svc2/tcp/p8080__10.0.2.1/8080
|
||||
`)
|
||||
assertNFTablesTransactionEqual(t, getLine(), expected, nft.Dump())
|
||||
|
||||
// Add a new service and its endpoints
|
||||
@ -4316,16 +4351,30 @@ func TestSyncProxyRulesRepeated(t *testing.T) {
|
||||
fp.syncProxyRules()
|
||||
|
||||
expected = baseRules + dedent.Dedent(`
|
||||
# FIXME
|
||||
add chain ip kube-proxy service-ULMVA6XW-ns1/svc1/tcp/p80
|
||||
add chain ip kube-proxy endpoint-5TPGNJF2-ns1/svc1/tcp/p80__10.0.1.1/80
|
||||
|
||||
add chain ip kube-proxy service-MHHHYRWA-ns2/svc2/tcp/p8080
|
||||
add chain ip kube-proxy endpoint-7RVP4LUQ-ns2/svc2/tcp/p8080__10.0.2.1/8080
|
||||
|
||||
add chain ip kube-proxy service-4AT6LBPK-ns3/svc3/tcp/p80
|
||||
add chain ip kube-proxy endpoint-2OCDJSZQ-ns3/svc3/tcp/p80__10.0.3.1/80
|
||||
`)
|
||||
assertNFTablesTransactionEqual(t, getLine(), expected, nft.Dump())
|
||||
|
||||
// Delete a service.
|
||||
// Delete a service; its chains will be flushed, but not immediately deleted.
|
||||
fp.OnServiceDelete(svc2)
|
||||
fp.syncProxyRules()
|
||||
assertNFTablesTransactionEqual(t, getLine(), expected, nft.Dump())
|
||||
|
||||
ageStaleChains()
|
||||
fp.syncProxyRules()
|
||||
expected = baseRules + dedent.Dedent(`
|
||||
# FIXME
|
||||
add chain ip kube-proxy service-ULMVA6XW-ns1/svc1/tcp/p80
|
||||
add chain ip kube-proxy endpoint-5TPGNJF2-ns1/svc1/tcp/p80__10.0.1.1/80
|
||||
|
||||
add chain ip kube-proxy service-4AT6LBPK-ns3/svc3/tcp/p80
|
||||
add chain ip kube-proxy endpoint-2OCDJSZQ-ns3/svc3/tcp/p80__10.0.3.1/80
|
||||
`)
|
||||
assertNFTablesTransactionEqual(t, getLine(), expected, nft.Dump())
|
||||
|
||||
@ -4343,7 +4392,11 @@ func TestSyncProxyRulesRepeated(t *testing.T) {
|
||||
)
|
||||
fp.syncProxyRules()
|
||||
expected = baseRules + dedent.Dedent(`
|
||||
# FIXME
|
||||
add chain ip kube-proxy service-ULMVA6XW-ns1/svc1/tcp/p80
|
||||
add chain ip kube-proxy endpoint-5TPGNJF2-ns1/svc1/tcp/p80__10.0.1.1/80
|
||||
|
||||
add chain ip kube-proxy service-4AT6LBPK-ns3/svc3/tcp/p80
|
||||
add chain ip kube-proxy endpoint-2OCDJSZQ-ns3/svc3/tcp/p80__10.0.3.1/80
|
||||
`)
|
||||
assertNFTablesTransactionEqual(t, getLine(), expected, nft.Dump())
|
||||
|
||||
@ -4362,7 +4415,14 @@ func TestSyncProxyRulesRepeated(t *testing.T) {
|
||||
)
|
||||
fp.syncProxyRules()
|
||||
expected = baseRules + dedent.Dedent(`
|
||||
# FIXME
|
||||
add chain ip kube-proxy service-ULMVA6XW-ns1/svc1/tcp/p80
|
||||
add chain ip kube-proxy endpoint-5TPGNJF2-ns1/svc1/tcp/p80__10.0.1.1/80
|
||||
|
||||
add chain ip kube-proxy service-4AT6LBPK-ns3/svc3/tcp/p80
|
||||
add chain ip kube-proxy endpoint-2OCDJSZQ-ns3/svc3/tcp/p80__10.0.3.1/80
|
||||
|
||||
add chain ip kube-proxy service-LAUZTJTB-ns4/svc4/tcp/p80
|
||||
add chain ip kube-proxy endpoint-WAHRBT2B-ns4/svc4/tcp/p80__10.0.4.1/80
|
||||
`)
|
||||
assertNFTablesTransactionEqual(t, getLine(), expected, nft.Dump())
|
||||
|
||||
@ -4372,11 +4432,23 @@ func TestSyncProxyRulesRepeated(t *testing.T) {
|
||||
fp.OnEndpointSliceUpdate(eps3, eps3update)
|
||||
fp.syncProxyRules()
|
||||
|
||||
// The old endpoint chain (for 10.0.3.1) will not be deleted yet.
|
||||
expected = baseRules + dedent.Dedent(`
|
||||
# FIXME
|
||||
add chain ip kube-proxy service-ULMVA6XW-ns1/svc1/tcp/p80
|
||||
add chain ip kube-proxy endpoint-5TPGNJF2-ns1/svc1/tcp/p80__10.0.1.1/80
|
||||
|
||||
add chain ip kube-proxy service-4AT6LBPK-ns3/svc3/tcp/p80
|
||||
add chain ip kube-proxy endpoint-2OCDJSZQ-ns3/svc3/tcp/p80__10.0.3.1/80
|
||||
add chain ip kube-proxy endpoint-SWWHDC7X-ns3/svc3/tcp/p80__10.0.3.2/80
|
||||
|
||||
add chain ip kube-proxy service-LAUZTJTB-ns4/svc4/tcp/p80
|
||||
add chain ip kube-proxy endpoint-WAHRBT2B-ns4/svc4/tcp/p80__10.0.4.1/80
|
||||
`)
|
||||
assertNFTablesTransactionEqual(t, getLine(), expected, nft.Dump())
|
||||
|
||||
// (Ensure the old svc3 chain gets deleted in the next sync.)
|
||||
ageStaleChains()
|
||||
|
||||
// Add an endpoint to a service.
|
||||
eps3update2 := eps3update.DeepCopy()
|
||||
eps3update2.Endpoints = append(eps3update2.Endpoints, discovery.Endpoint{Addresses: []string{"10.0.3.3"}})
|
||||
@ -4384,7 +4456,15 @@ func TestSyncProxyRulesRepeated(t *testing.T) {
|
||||
fp.syncProxyRules()
|
||||
|
||||
expected = baseRules + dedent.Dedent(`
|
||||
# FIXME
|
||||
add chain ip kube-proxy service-ULMVA6XW-ns1/svc1/tcp/p80
|
||||
add chain ip kube-proxy endpoint-5TPGNJF2-ns1/svc1/tcp/p80__10.0.1.1/80
|
||||
|
||||
add chain ip kube-proxy service-4AT6LBPK-ns3/svc3/tcp/p80
|
||||
add chain ip kube-proxy endpoint-SWWHDC7X-ns3/svc3/tcp/p80__10.0.3.2/80
|
||||
add chain ip kube-proxy endpoint-TQ2QKHCZ-ns3/svc3/tcp/p80__10.0.3.3/80
|
||||
|
||||
add chain ip kube-proxy service-LAUZTJTB-ns4/svc4/tcp/p80
|
||||
add chain ip kube-proxy endpoint-WAHRBT2B-ns4/svc4/tcp/p80__10.0.4.1/80
|
||||
`)
|
||||
assertNFTablesTransactionEqual(t, getLine(), expected, nft.Dump())
|
||||
|
||||
@ -4670,3 +4750,156 @@ func TestLoadBalancerIngressRouteTypeProxy(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func Test_servicePortChainNameBase(t *testing.T) {
|
||||
testCases := []struct {
|
||||
name string
|
||||
spn proxy.ServicePortName
|
||||
protocol string
|
||||
expected string
|
||||
}{
|
||||
{
|
||||
name: "simple",
|
||||
spn: proxy.ServicePortName{
|
||||
NamespacedName: types.NamespacedName{
|
||||
Namespace: "testing",
|
||||
Name: "service",
|
||||
},
|
||||
Port: "http",
|
||||
},
|
||||
protocol: "tcp",
|
||||
expected: "P4ZYZVCF-testing/service/tcp/http",
|
||||
},
|
||||
{
|
||||
name: "different port, different hash",
|
||||
spn: proxy.ServicePortName{
|
||||
NamespacedName: types.NamespacedName{
|
||||
Namespace: "testing",
|
||||
Name: "service",
|
||||
},
|
||||
Port: "https",
|
||||
},
|
||||
protocol: "tcp",
|
||||
expected: "LZBRENCP-testing/service/tcp/https",
|
||||
},
|
||||
{
|
||||
name: "max length",
|
||||
spn: proxy.ServicePortName{
|
||||
NamespacedName: types.NamespacedName{
|
||||
Namespace: "very-long-namespace-name-abcdefghijklmnopqrstuvwxyz0123456789xx",
|
||||
Name: "very-long-service-name-why-would-you-even-do-this-i-mean-really",
|
||||
},
|
||||
Port: "port-443-providing-the-hypertext-transmission-protocol-with-tls",
|
||||
},
|
||||
protocol: "sctp",
|
||||
expected: "KR6NACJP-very-long-namespace-name-abcdefghijklmnopqrstuvwxyz0123456789xx/very-long-service-name-why-would-you-even-do-this-i-mean-really/sctp/port-443-providing-the-hypertext-transmission-protocol-with-tls",
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
name := servicePortChainNameBase(&tc.spn, tc.protocol)
|
||||
if name != tc.expected {
|
||||
t.Errorf("expected %q, got %q", tc.expected, name)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func Test_servicePortEndpointChainNameBase(t *testing.T) {
|
||||
testCases := []struct {
|
||||
name string
|
||||
spn proxy.ServicePortName
|
||||
protocol string
|
||||
endpoint string
|
||||
expected string
|
||||
}{
|
||||
{
|
||||
name: "simple",
|
||||
spn: proxy.ServicePortName{
|
||||
NamespacedName: types.NamespacedName{
|
||||
Namespace: "testing",
|
||||
Name: "service",
|
||||
},
|
||||
Port: "http",
|
||||
},
|
||||
protocol: "tcp",
|
||||
endpoint: "10.180.0.1:80",
|
||||
expected: "JO2XBXZR-testing/service/tcp/http__10.180.0.1/80",
|
||||
},
|
||||
{
|
||||
name: "different endpoint, different hash",
|
||||
spn: proxy.ServicePortName{
|
||||
NamespacedName: types.NamespacedName{
|
||||
Namespace: "testing",
|
||||
Name: "service",
|
||||
},
|
||||
Port: "http",
|
||||
},
|
||||
protocol: "tcp",
|
||||
endpoint: "10.180.0.2:80",
|
||||
expected: "5S6H3H22-testing/service/tcp/http__10.180.0.2/80",
|
||||
},
|
||||
{
|
||||
name: "ipv6",
|
||||
spn: proxy.ServicePortName{
|
||||
NamespacedName: types.NamespacedName{
|
||||
Namespace: "testing",
|
||||
Name: "service",
|
||||
},
|
||||
Port: "http",
|
||||
},
|
||||
protocol: "tcp",
|
||||
endpoint: "[fd80:abcd:12::a1b2:c3d4:e5f6:9999]:80",
|
||||
expected: "U7E2ET36-testing/service/tcp/http__fd80.abcd.12..a1b2.c3d4.e5f6.9999/80",
|
||||
},
|
||||
{
|
||||
name: "max length without truncation",
|
||||
spn: proxy.ServicePortName{
|
||||
NamespacedName: types.NamespacedName{
|
||||
Namespace: "very-long-namespace-name-abcdefghijklmnopqrstuvwxyz0123456789xx",
|
||||
Name: "very-long-service-name-why-would-you-even-do-this-i-mean-really",
|
||||
},
|
||||
Port: "port-443-providing-the-hypertext-transmission-protocol-with-tls",
|
||||
},
|
||||
protocol: "sctp",
|
||||
endpoint: "[1234:5678:9abc:def0::abc:1234]:443",
|
||||
expected: "5YS7AFEA-very-long-namespace-name-abcdefghijklmnopqrstuvwxyz0123456789xx/very-long-service-name-why-would-you-even-do-this-i-mean-really/sctp/port-443-providing-the-hypertext-transmission-protocol-with-tls__1234.5678.9abc.def0..abc.1234/443",
|
||||
},
|
||||
{
|
||||
name: "truncated, 1",
|
||||
spn: proxy.ServicePortName{
|
||||
NamespacedName: types.NamespacedName{
|
||||
Namespace: "very-long-namespace-name-abcdefghijklmnopqrstuvwxyz0123456789xx",
|
||||
Name: "very-long-service-name-why-would-you-even-do-this-i-mean-really",
|
||||
},
|
||||
Port: "port-443-providing-the-hypertext-transmission-protocol-with-tls",
|
||||
},
|
||||
protocol: "sctp",
|
||||
endpoint: "[1234:5678:9abc:def0::abcd:1234:5678]:443",
|
||||
expected: "CI6C53Q3-very-long-namespace-name-abcdefghijklmnopqrstuvwxyz0123456789xx/very-long-service-name-why-would-you-even-do-this-i-mean-really/sctp/port-443-providing-the-hypertext-transmission-protocol-with-tls__1234.5678.9abc.def0..abcd.1234...",
|
||||
},
|
||||
{
|
||||
name: "truncated, 2 (different IP, which is not visible in the result)",
|
||||
spn: proxy.ServicePortName{
|
||||
NamespacedName: types.NamespacedName{
|
||||
Namespace: "very-long-namespace-name-abcdefghijklmnopqrstuvwxyz0123456789xx",
|
||||
Name: "very-long-service-name-why-would-you-even-do-this-i-mean-really",
|
||||
},
|
||||
Port: "port-443-providing-the-hypertext-transmission-protocol-with-tls",
|
||||
},
|
||||
protocol: "sctp",
|
||||
endpoint: "[1234:5678:9abc:def0::abcd:1234:8765]:443",
|
||||
expected: "2FLXFK6X-very-long-namespace-name-abcdefghijklmnopqrstuvwxyz0123456789xx/very-long-service-name-why-would-you-even-do-this-i-mean-really/sctp/port-443-providing-the-hypertext-transmission-protocol-with-tls__1234.5678.9abc.def0..abcd.1234...",
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
name := servicePortEndpointChainNameBase(&tc.spn, tc.protocol, tc.endpoint)
|
||||
if name != tc.expected {
|
||||
t.Errorf("expected %q, got %q", tc.expected, name)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user