From 6cff4153057aca0b5a3b816263fd7eb1129fd3da Mon Sep 17 00:00:00 2001 From: Dan Winship Date: Wed, 17 May 2023 20:28:57 -0400 Subject: [PATCH] Port service/endpoint chain creation/cleanup to nftables --- pkg/proxy/nftables/proxier.go | 304 +++++++++++++++++------------ pkg/proxy/nftables/proxier_test.go | 259 ++++++++++++++++++++++-- 2 files changed, 420 insertions(+), 143 deletions(-) diff --git a/pkg/proxy/nftables/proxier.go b/pkg/proxy/nftables/proxier.go index 00dc5f3b097..531a88764b4 100644 --- a/pkg/proxy/nftables/proxier.go +++ b/pkg/proxy/nftables/proxier.go @@ -21,7 +21,6 @@ package nftables // import ( - "bytes" "context" "crypto/sha256" "encoding/base32" @@ -103,12 +102,13 @@ func newServiceInfo(port *v1.ServicePort, service *v1.Service, bsvcPortInfo *pro // Store the following for performance reasons. svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} svcPortName := proxy.ServicePortName{NamespacedName: svcName, Port: port.Name} - protocol := strings.ToLower(string(svcPort.Protocol())) svcPort.nameString = svcPortName.String() - svcPort.clusterPolicyChainName = servicePortPolicyClusterChain(svcPort.nameString, protocol) - svcPort.localPolicyChainName = servicePortPolicyLocalChainName(svcPort.nameString, protocol) - svcPort.firewallChainName = serviceFirewallChainName(svcPort.nameString, protocol) - svcPort.externalChainName = serviceExternalChainName(svcPort.nameString, protocol) + + chainNameBase := servicePortChainNameBase(&svcPortName, strings.ToLower(string(svcPort.Protocol()))) + svcPort.clusterPolicyChainName = servicePortPolicyClusterChainNamePrefix + chainNameBase + svcPort.localPolicyChainName = servicePortPolicyLocalChainNamePrefix + chainNameBase + svcPort.firewallChainName = serviceFirewallChainNamePrefix + chainNameBase + svcPort.externalChainName = serviceExternalChainNamePrefix + chainNameBase return svcPort } @@ -124,7 +124,7 @@ type endpointInfo struct { func newEndpointInfo(baseInfo *proxy.BaseEndpointInfo, svcPortName *proxy.ServicePortName) proxy.Endpoint { return &endpointInfo{ BaseEndpointInfo: baseInfo, - chainName: servicePortEndpointChainName(svcPortName.String(), strings.ToLower(string(svcPortName.Protocol)), baseInfo.String()), + chainName: servicePortEndpointChainNamePrefix + servicePortEndpointChainNameBase(svcPortName, strings.ToLower(string(svcPortName.Protocol)), baseInfo.String()), } } @@ -174,11 +174,8 @@ type Proxier struct { // The following buffers are used to reuse memory and avoid allocations // that are significantly impacting performance. - iptablesData *bytes.Buffer - filterChains proxyutil.LineBuffer - filterRules proxyutil.LineBuffer - natChains proxyutil.LineBuffer - natRules proxyutil.LineBuffer + filterRules proxyutil.LineBuffer + natRules proxyutil.LineBuffer // conntrackTCPLiberal indicates whether the system sets the kernel nf_conntrack_tcp_be_liberal conntrackTCPLiberal bool @@ -188,6 +185,9 @@ type Proxier struct { // networkInterfacer defines an interface for several net library functions. // Inject for test purpose. networkInterfacer proxyutil.NetworkInterfacer + + // staleChains contains information about chains to be deleted later + staleChains map[string]time.Time } // Proxier implements proxy.Provider @@ -263,14 +263,12 @@ func NewProxier(ipFamily v1.IPFamily, serviceHealthServer: serviceHealthServer, healthzServer: healthzServer, precomputedProbabilities: make([]string, 0, 1001), - iptablesData: bytes.NewBuffer(nil), - filterChains: proxyutil.NewLineBuffer(), filterRules: proxyutil.NewLineBuffer(), - natChains: proxyutil.NewLineBuffer(), natRules: proxyutil.NewLineBuffer(), nodePortAddresses: nodePortAddresses, networkInterfacer: proxyutil.RealNetwork{}, conntrackTCPLiberal: conntrackTCPLiberal, + staleChains: make(map[string]time.Time), } burstSyncs := 2 @@ -667,75 +665,113 @@ func (proxier *Proxier) OnNodeDelete(node *v1.Node) { func (proxier *Proxier) OnNodeSynced() { } -// portProtoHash takes the ServicePortName and protocol for a service -// returns the associated 16 character hash. This is computed by hashing (sha256) -// then encoding to base32 and truncating to 16 chars. We do this because IPTables -// Chain Names must be <= 28 chars long, and the longer they are the harder they are to read. -func portProtoHash(servicePortName string, protocol string) string { - hash := sha256.Sum256([]byte(servicePortName + protocol)) - encoded := base32.StdEncoding.EncodeToString(hash[:]) - return encoded[:16] -} - const ( - servicePortPolicyClusterChainNamePrefix = "KUBE-SVC-" - servicePortPolicyLocalChainNamePrefix = "KUBE-SVL-" - serviceFirewallChainNamePrefix = "KUBE-FW-" - serviceExternalChainNamePrefix = "KUBE-EXT-" - servicePortEndpointChainNamePrefix = "KUBE-SEP-" + // Maximum length for one of our chain name prefixes, including the trailing + // hyphen. + chainNamePrefixLengthMax = 16 + + // Maximum length of the string returned from servicePortChainNameBase or + // servicePortEndpointChainNameBase. + chainNameBaseLengthMax = knftables.NameLengthMax - chainNamePrefixLengthMax ) -// servicePortPolicyClusterChain returns the name of the KUBE-SVC-XXXX chain for a service, which is the -// main iptables chain for that service, used for dispatching to endpoints when using `Cluster` -// traffic policy. -func servicePortPolicyClusterChain(servicePortName string, protocol string) string { - return servicePortPolicyClusterChainNamePrefix + portProtoHash(servicePortName, protocol) -} +const ( + servicePortPolicyClusterChainNamePrefix = "service-" + servicePortPolicyLocalChainNamePrefix = "local-" + serviceFirewallChainNamePrefix = "firewall-" + serviceExternalChainNamePrefix = "external-" + servicePortEndpointChainNamePrefix = "endpoint-" +) -// servicePortPolicyLocalChainName returns the name of the KUBE-SVL-XXXX chain for a service, which -// handles dispatching to local endpoints when using `Local` traffic policy. This chain only -// exists if the service has `Local` internal or external traffic policy. -func servicePortPolicyLocalChainName(servicePortName string, protocol string) string { - return servicePortPolicyLocalChainNamePrefix + portProtoHash(servicePortName, protocol) -} - -// serviceFirewallChainName returns the name of the KUBE-FW-XXXX chain for a service, which -// is used to implement the filtering for the LoadBalancerSourceRanges feature. -func serviceFirewallChainName(servicePortName string, protocol string) string { - return serviceFirewallChainNamePrefix + portProtoHash(servicePortName, protocol) -} - -// serviceExternalChainName returns the name of the KUBE-EXT-XXXX chain for a service, which -// implements "short-circuiting" for internally-originated external-destination traffic when using -// `Local` external traffic policy. It forwards traffic from local sources to the KUBE-SVC-XXXX -// chain and traffic from external sources to the KUBE-SVL-XXXX chain. -func serviceExternalChainName(servicePortName string, protocol string) string { - return serviceExternalChainNamePrefix + portProtoHash(servicePortName, protocol) -} - -// servicePortEndpointChainName returns the name of the KUBE-SEP-XXXX chain for a particular -// service endpoint. -func servicePortEndpointChainName(servicePortName string, protocol string, endpoint string) string { - hash := sha256.Sum256([]byte(servicePortName + protocol + endpoint)) +// hashAndTruncate prefixes name with a hash of itself and then truncates to +// chainNameBaseLengthMax. The hash ensures that (a) the name is still unique if we have +// to truncate the end, and (b) it's visually distinguishable from other chains that would +// otherwise have nearly identical names (e.g., different endpoint chains for a given +// service that differ in only a single digit). +func hashAndTruncate(name string) string { + hash := sha256.Sum256([]byte(name)) encoded := base32.StdEncoding.EncodeToString(hash[:]) - return servicePortEndpointChainNamePrefix + encoded[:16] + name = encoded[:8] + "-" + name + if len(name) > chainNameBaseLengthMax { + name = name[:chainNameBaseLengthMax-3] + "..." + } + return name +} + +// servicePortChainNameBase returns the base name for a chain for the given ServicePort. +// This is something like "HASH-namespace/serviceName/protocol/portName", e.g, +// "ULMVA6XW-ns1/svc1/tcp/p80". +func servicePortChainNameBase(servicePortName *proxy.ServicePortName, protocol string) string { + // nftables chains can contain the characters [A-Za-z0-9_./-] (but must start with + // a letter, underscore, or dot). + // + // Namespace, Service, and Port names can contain [a-z0-9-] (with some additional + // restrictions that aren't relevant here). + // + // Protocol is /(tcp|udp|sctp)/. + // + // Thus, we can safely use all Namespace names, Service names, protocol values, + // and Port names directly in nftables chain names (though note that this assumes + // that the chain name won't *start* with any of those strings, since that might + // be illegal). We use "/" to separate the parts of the name, which is one of the + // two characters allowed in a chain name that isn't allowed in our input strings. + + name := fmt.Sprintf("%s/%s/%s/%s", + servicePortName.NamespacedName.Namespace, + servicePortName.NamespacedName.Name, + protocol, + servicePortName.Port, + ) + + // The namespace, service, and port name can each be up to 63 characters, protocol + // can be up to 4, plus 8 for the hash and 4 additional punctuation characters. + // That's a total of 205, which is less than chainNameBaseLengthMax (240). So this + // will never actually return a truncated name. + return hashAndTruncate(name) +} + +// servicePortEndpointChainNameBase returns the suffix for chain names for the given +// endpoint. This is something like +// "HASH-namespace/serviceName/protocol/portName__endpointIP/endpointport", e.g., +// "5OJB2KTY-ns1/svc1/tcp/p80__10.180.0.1/80". +func servicePortEndpointChainNameBase(servicePortName *proxy.ServicePortName, protocol, endpoint string) string { + // As above in servicePortChainNameBase: Namespace, Service, Port, Protocol, and + // EndpointPort are all safe to copy into the chain name directly. But if + // EndpointIP is IPv6 then it will contain colons, which aren't allowed in a chain + // name. IPv6 IPs are also quite long, but we can't safely truncate them (e.g. to + // only the final segment) because (especially for manually-created external + // endpoints), we can't know for sure that any part of them is redundant. + + endpointIP, endpointPort, _ := net.SplitHostPort(endpoint) + if strings.Contains(endpointIP, ":") { + endpointIP = strings.ReplaceAll(endpointIP, ":", ".") + } + + // As above, we use "/" to separate parts of the name, and "__" to separate the + // "service" part from the "endpoint" part. + name := fmt.Sprintf("%s/%s/%s/%s__%s/%s", + servicePortName.NamespacedName.Namespace, + servicePortName.NamespacedName.Name, + protocol, + servicePortName.Port, + endpointIP, + endpointPort, + ) + + // The part of name before the "__" can be up to 205 characters (as with + // servicePortChainNameBase above). An IPv6 address can be up to 39 characters, and + // a port can be up to 5 digits, plus 3 punctuation characters gives a max total + // length of 252, well over chainNameBaseLengthMax (240), so truncation is + // theoretically possible (though incredibly unlikely). + return hashAndTruncate(name) } func isServiceChainName(chainString string) bool { - prefixes := []string{ - servicePortPolicyClusterChainNamePrefix, - servicePortPolicyLocalChainNamePrefix, - servicePortEndpointChainNamePrefix, - serviceFirewallChainNamePrefix, - serviceExternalChainNamePrefix, - } - - for _, p := range prefixes { - if strings.HasPrefix(chainString, p) { - return true - } - } - return false + // The chains returned from servicePortChainNameBase and + // servicePortEndpointChainNameBase will always have at least one "/" in them. + // Since none of our "stock" chain names use slashes, we can distinguish them this + // way. + return strings.Contains(chainString, "/") } // This is where all of the nftables calls happen. @@ -774,18 +810,45 @@ func (proxier *Proxier) syncProxyRules() { } }() + // If there are sufficiently-stale chains left over from previous transactions, + // try to delete them now. + if len(proxier.staleChains) > 0 { + oneSecondAgo := start.Add(-time.Second) + tx := proxier.nftables.NewTransaction() + deleted := 0 + for chain, modtime := range proxier.staleChains { + if modtime.Before(oneSecondAgo) { + tx.Delete(&knftables.Chain{ + Name: chain, + }) + delete(proxier.staleChains, chain) + deleted++ + } + } + if deleted > 0 { + klog.InfoS("Deleting stale nftables chains", "numChains", deleted) + err := proxier.nftables.Run(context.TODO(), tx) + if err != nil { + // We already deleted the entries from staleChains, but if + // the chains still exist, they'll just get added back + // (with a later timestamp) at the end of the sync. + klog.ErrorS(err, "Unable to delete stale chains; will retry later") + // FIXME: metric + } + } + } + + // Now start the actual syncing transaction tx := proxier.nftables.NewTransaction() proxier.setupNFTables(tx) // Reset all buffers used later. // This is to avoid memory reallocations and thus improve performance. - proxier.filterChains.Reset() proxier.filterRules.Reset() - proxier.natChains.Reset() proxier.natRules.Reset() - // Accumulate NAT chains to keep. - activeNATChains := sets.New[string]() + // Accumulate service/endpoint chains to keep. + activeChains := sets.New[string]() // Compute total number of endpoint chains across all services // to get a sense of how big the cluster is. @@ -819,7 +882,7 @@ func (proxier *Proxier) syncProxyRules() { // Note the endpoint chains that will be used for _, ep := range allLocallyReachableEndpoints { if epInfo, ok := ep.(*endpointInfo); ok { - activeNATChains.Insert(epInfo.chainName) + ensureChain(epInfo.chainName, tx, activeChains) } } @@ -827,14 +890,14 @@ func (proxier *Proxier) syncProxyRules() { clusterPolicyChain := svcInfo.clusterPolicyChainName usesClusterPolicyChain := len(clusterEndpoints) > 0 && svcInfo.UsesClusterEndpoints() if usesClusterPolicyChain { - activeNATChains.Insert(clusterPolicyChain) + ensureChain(clusterPolicyChain, tx, activeChains) } // localPolicyChain contains the endpoints used with "Local" traffic policy localPolicyChain := svcInfo.localPolicyChainName usesLocalPolicyChain := len(localEndpoints) > 0 && svcInfo.UsesLocalEndpoints() if usesLocalPolicyChain { - activeNATChains.Insert(localPolicyChain) + ensureChain(localPolicyChain, tx, activeChains) } // internalPolicyChain is the chain containing the endpoints for @@ -876,7 +939,7 @@ func (proxier *Proxier) syncProxyRules() { // are no externally-usable endpoints. usesExternalTrafficChain := hasEndpoints && svcInfo.ExternallyAccessible() if usesExternalTrafficChain { - activeNATChains.Insert(externalTrafficChain) + ensureChain(externalTrafficChain, tx, activeChains) } // Traffic to LoadBalancer IPs can go directly to externalTrafficChain @@ -886,7 +949,7 @@ func (proxier *Proxier) syncProxyRules() { fwChain := svcInfo.firewallChainName usesFWChain := hasEndpoints && len(svcInfo.LoadBalancerVIPStrings()) > 0 && len(svcInfo.LoadBalancerSourceRanges()) > 0 if usesFWChain { - activeNATChains.Insert(fwChain) + ensureChain(fwChain, tx, activeChains) loadBalancerTrafficChain = fwChain } @@ -1069,8 +1132,6 @@ func (proxier *Proxier) syncProxyRules() { // jump to externalTrafficChain, which will handle some special cases and // then jump to externalPolicyChain. if usesExternalTrafficChain { - proxier.natChains.Write(utiliptables.MakeChainLine(utiliptables.Chain(externalTrafficChain))) - if !svcInfo.ExternalPolicyLocal() { // If we are using non-local endpoints we need to masquerade, // in case we cross nodes. @@ -1123,8 +1184,6 @@ func (proxier *Proxier) syncProxyRules() { // Set up firewall chain, if needed if usesFWChain { - proxier.natChains.Write(utiliptables.MakeChainLine(utiliptables.Chain(fwChain))) - // The service firewall rules are created based on the // loadBalancerSourceRanges field. This only works for VIP-like // loadbalancers that preserve source IPs. For loadbalancers which @@ -1171,17 +1230,15 @@ func (proxier *Proxier) syncProxyRules() { ) } - // If Cluster policy is in use, create the chain and create rules jumping - // from clusterPolicyChain to the clusterEndpoints + // If Cluster policy is in use, create rules jumping from + // clusterPolicyChain to the clusterEndpoints if usesClusterPolicyChain { - proxier.natChains.Write(utiliptables.MakeChainLine(utiliptables.Chain(clusterPolicyChain))) proxier.writeServiceToEndpointRules(svcPortNameString, svcInfo, clusterPolicyChain, clusterEndpoints) } - // If Local policy is in use, create the chain and create rules jumping - // from localPolicyChain to the localEndpoints + // If Local policy is in use, create rules jumping from localPolicyChain + // to the localEndpoints if usesLocalPolicyChain { - proxier.natChains.Write(utiliptables.MakeChainLine(utiliptables.Chain(localPolicyChain))) proxier.writeServiceToEndpointRules(svcPortNameString, svcInfo, localPolicyChain, localEndpoints) } @@ -1195,10 +1252,6 @@ func (proxier *Proxier) syncProxyRules() { endpointChain := epInfo.chainName - // Create the endpoint chain - proxier.natChains.Write(utiliptables.MakeChainLine(utiliptables.Chain(endpointChain))) - activeNATChains.Insert(endpointChain) - // Handle traffic that loops back to the originator with SNAT. proxier.natRules.Write( "-A", string(endpointChain), @@ -1220,33 +1273,6 @@ func (proxier *Proxier) syncProxyRules() { } } - // Delete chains no longer in use. - deletedChains := 0 - var existingNATChains map[utiliptables.Chain]struct{} - - proxier.iptablesData.Reset() - if err := proxier.iptables.SaveInto(utiliptables.TableNAT, proxier.iptablesData); err == nil { - existingNATChains = utiliptables.GetChainsFromTable(proxier.iptablesData.Bytes()) - - for chain := range existingNATChains { - if !activeNATChains.Has(string(chain)) { - chainString := string(chain) - if !isServiceChainName(chainString) { - // Ignore chains that aren't ours. - continue - } - // We must (as per iptables) write a chain-line - // for it, which has the nice effect of flushing - // the chain. Then we can remove the chain. - proxier.natChains.Write(utiliptables.MakeChainLine(chain)) - proxier.natRules.Write("-X", chainString) - deletedChains++ - } - } - } else { - klog.ErrorS(err, "Failed to execute iptables-save: stale chains will not be deleted") - } - // Finally, tail-call to the nodePorts chain. This needs to be after all // other service portal rules. if proxier.nodePortAddresses.MatchAll() { @@ -1285,23 +1311,41 @@ func (proxier *Proxier) syncProxyRules() { } } + // Figure out which chains are now stale. Unfortunately, we can't delete them + // right away, because with kernels before 6.2, if there is a map element pointing + // to a chain, and you delete that map element, the kernel doesn't notice until a + // short amount of time later that the chain is now unreferenced. So we flush them + // now, and record the time that they become stale in staleChains so they can be + // deleted later. + existingChains, err := proxier.nftables.List(context.TODO(), "chains") + if err == nil { + for _, chain := range existingChains { + if isServiceChainName(chain) && !activeChains.Has(chain) { + tx.Flush(&knftables.Chain{ + Name: chain, + }) + proxier.staleChains[chain] = start + } + } + } else if !knftables.IsNotFound(err) { + klog.ErrorS(err, "Failed to list nftables chains: stale chains will not be deleted") + } + metrics.IptablesRulesTotal.WithLabelValues(string(utiliptables.TableFilter)).Set(float64(proxier.filterRules.Lines())) - metrics.IptablesRulesTotal.WithLabelValues(string(utiliptables.TableNAT)).Set(float64(proxier.natRules.Lines() - deletedChains)) + metrics.IptablesRulesTotal.WithLabelValues(string(utiliptables.TableNAT)).Set(float64(proxier.natRules.Lines())) // Sync rules. klog.V(2).InfoS("Reloading service nftables data", "numServices", len(proxier.svcPortMap), "numEndpoints", totalEndpoints, - "numFilterChains", proxier.filterChains.Lines(), "numFilterRules", proxier.filterRules.Lines(), - "numNATChains", proxier.natChains.Lines(), "numNATRules", proxier.natRules.Lines(), ) // FIXME // klog.V(9).InfoS("Running nftables transaction", "transaction", tx.Bytes()) - err := proxier.nftables.Run(context.TODO(), tx) + err = proxier.nftables.Run(context.TODO(), tx) if err != nil { klog.ErrorS(err, "nftables sync failed") metrics.IptablesRestoreFailuresTotal.Inc() diff --git a/pkg/proxy/nftables/proxier_test.go b/pkg/proxy/nftables/proxier_test.go index 45c0b805f31..d10a81c3aa7 100644 --- a/pkg/proxy/nftables/proxier_test.go +++ b/pkg/proxy/nftables/proxier_test.go @@ -17,7 +17,6 @@ limitations under the License. package nftables import ( - "bytes" "fmt" "net" "reflect" @@ -322,14 +321,12 @@ func NewFakeProxier(ipFamily v1.IPFamily) (*knftables.Fake, *Proxier) { hostname: testHostname, serviceHealthServer: healthcheck.NewFakeServiceHealthServer(), precomputedProbabilities: make([]string, 0, 1001), - iptablesData: bytes.NewBuffer(nil), - filterChains: proxyutil.NewLineBuffer(), filterRules: proxyutil.NewLineBuffer(), - natChains: proxyutil.NewLineBuffer(), natRules: proxyutil.NewLineBuffer(), nodeIP: netutils.ParseIPSloppy(testNodeIP), nodePortAddresses: proxyutil.NewNodePortAddresses(ipFamily, nil), networkInterfacer: networkInterfacer, + staleChains: make(map[string]time.Time), } p.setInitialized(true) p.syncRunner = async.NewBoundedFrequencyRunner("test-sync-runner", p.syncProxyRules, 0, time.Minute, 1) @@ -534,6 +531,32 @@ func TestOverallNFTablesRules(t *testing.T) { add rule ip kube-proxy nat-postrouting jump masquerading add chain ip kube-proxy nat-prerouting { type nat hook prerouting priority -100 ; } add rule ip kube-proxy nat-prerouting jump services + + # svc1 + add chain ip kube-proxy service-ULMVA6XW-ns1/svc1/tcp/p80 + add chain ip kube-proxy endpoint-5OJB2KTY-ns1/svc1/tcp/p80__10.180.0.1/80 + + # svc2 + add chain ip kube-proxy service-42NFTM6N-ns2/svc2/tcp/p80 + add chain ip kube-proxy external-42NFTM6N-ns2/svc2/tcp/p80 + add chain ip kube-proxy endpoint-SGOXE6O3-ns2/svc2/tcp/p80__10.180.0.2/80 + + # svc3 + add chain ip kube-proxy service-4AT6LBPK-ns3/svc3/tcp/p80 + add chain ip kube-proxy external-4AT6LBPK-ns3/svc3/tcp/p80 + add chain ip kube-proxy endpoint-UEIP74TE-ns3/svc3/tcp/p80__10.180.0.3/80 + + # svc4 + add chain ip kube-proxy service-LAUZTJTB-ns4/svc4/tcp/p80 + add chain ip kube-proxy external-LAUZTJTB-ns4/svc4/tcp/p80 + add chain ip kube-proxy endpoint-5RFCDDV7-ns4/svc4/tcp/p80__10.180.0.5/80 + add chain ip kube-proxy endpoint-UNZV3OEC-ns4/svc4/tcp/p80__10.180.0.4/80 + + # svc5 + add chain ip kube-proxy service-HVFWP5L3-ns5/svc5/tcp/p80 + add chain ip kube-proxy external-HVFWP5L3-ns5/svc5/tcp/p80 + add chain ip kube-proxy firewall-HVFWP5L3-ns5/svc5/tcp/p80 + add chain ip kube-proxy endpoint-GTK6MW7G-ns5/svc5/tcp/p80__10.180.0.3/80 `) assertNFTablesTransactionEqual(t, getLine(), expected, nft.Dump()) @@ -4229,6 +4252,14 @@ func TestSyncProxyRulesRepeated(t *testing.T) { add rule ip kube-proxy nat-prerouting jump services `) + // Helper function to make it look like time has passed (from the point of view of + // the stale-chain-deletion code). + ageStaleChains := func() { + for chain, t := range fp.staleChains { + fp.staleChains[chain] = t.Add(-2 * time.Second) + } + } + // Create initial state var svc2 *v1.Service @@ -4282,8 +4313,12 @@ func TestSyncProxyRulesRepeated(t *testing.T) { fp.syncProxyRules() expected := baseRules + dedent.Dedent(` - # FIXME - `) + add chain ip kube-proxy service-ULMVA6XW-ns1/svc1/tcp/p80 + add chain ip kube-proxy endpoint-5TPGNJF2-ns1/svc1/tcp/p80__10.0.1.1/80 + + add chain ip kube-proxy service-MHHHYRWA-ns2/svc2/tcp/p8080 + add chain ip kube-proxy endpoint-7RVP4LUQ-ns2/svc2/tcp/p8080__10.0.2.1/8080 + `) assertNFTablesTransactionEqual(t, getLine(), expected, nft.Dump()) // Add a new service and its endpoints @@ -4316,16 +4351,30 @@ func TestSyncProxyRulesRepeated(t *testing.T) { fp.syncProxyRules() expected = baseRules + dedent.Dedent(` - # FIXME + add chain ip kube-proxy service-ULMVA6XW-ns1/svc1/tcp/p80 + add chain ip kube-proxy endpoint-5TPGNJF2-ns1/svc1/tcp/p80__10.0.1.1/80 + + add chain ip kube-proxy service-MHHHYRWA-ns2/svc2/tcp/p8080 + add chain ip kube-proxy endpoint-7RVP4LUQ-ns2/svc2/tcp/p8080__10.0.2.1/8080 + + add chain ip kube-proxy service-4AT6LBPK-ns3/svc3/tcp/p80 + add chain ip kube-proxy endpoint-2OCDJSZQ-ns3/svc3/tcp/p80__10.0.3.1/80 `) assertNFTablesTransactionEqual(t, getLine(), expected, nft.Dump()) - // Delete a service. + // Delete a service; its chains will be flushed, but not immediately deleted. fp.OnServiceDelete(svc2) fp.syncProxyRules() + assertNFTablesTransactionEqual(t, getLine(), expected, nft.Dump()) + ageStaleChains() + fp.syncProxyRules() expected = baseRules + dedent.Dedent(` - # FIXME + add chain ip kube-proxy service-ULMVA6XW-ns1/svc1/tcp/p80 + add chain ip kube-proxy endpoint-5TPGNJF2-ns1/svc1/tcp/p80__10.0.1.1/80 + + add chain ip kube-proxy service-4AT6LBPK-ns3/svc3/tcp/p80 + add chain ip kube-proxy endpoint-2OCDJSZQ-ns3/svc3/tcp/p80__10.0.3.1/80 `) assertNFTablesTransactionEqual(t, getLine(), expected, nft.Dump()) @@ -4343,7 +4392,11 @@ func TestSyncProxyRulesRepeated(t *testing.T) { ) fp.syncProxyRules() expected = baseRules + dedent.Dedent(` - # FIXME + add chain ip kube-proxy service-ULMVA6XW-ns1/svc1/tcp/p80 + add chain ip kube-proxy endpoint-5TPGNJF2-ns1/svc1/tcp/p80__10.0.1.1/80 + + add chain ip kube-proxy service-4AT6LBPK-ns3/svc3/tcp/p80 + add chain ip kube-proxy endpoint-2OCDJSZQ-ns3/svc3/tcp/p80__10.0.3.1/80 `) assertNFTablesTransactionEqual(t, getLine(), expected, nft.Dump()) @@ -4362,7 +4415,14 @@ func TestSyncProxyRulesRepeated(t *testing.T) { ) fp.syncProxyRules() expected = baseRules + dedent.Dedent(` - # FIXME + add chain ip kube-proxy service-ULMVA6XW-ns1/svc1/tcp/p80 + add chain ip kube-proxy endpoint-5TPGNJF2-ns1/svc1/tcp/p80__10.0.1.1/80 + + add chain ip kube-proxy service-4AT6LBPK-ns3/svc3/tcp/p80 + add chain ip kube-proxy endpoint-2OCDJSZQ-ns3/svc3/tcp/p80__10.0.3.1/80 + + add chain ip kube-proxy service-LAUZTJTB-ns4/svc4/tcp/p80 + add chain ip kube-proxy endpoint-WAHRBT2B-ns4/svc4/tcp/p80__10.0.4.1/80 `) assertNFTablesTransactionEqual(t, getLine(), expected, nft.Dump()) @@ -4372,11 +4432,23 @@ func TestSyncProxyRulesRepeated(t *testing.T) { fp.OnEndpointSliceUpdate(eps3, eps3update) fp.syncProxyRules() + // The old endpoint chain (for 10.0.3.1) will not be deleted yet. expected = baseRules + dedent.Dedent(` - # FIXME + add chain ip kube-proxy service-ULMVA6XW-ns1/svc1/tcp/p80 + add chain ip kube-proxy endpoint-5TPGNJF2-ns1/svc1/tcp/p80__10.0.1.1/80 + + add chain ip kube-proxy service-4AT6LBPK-ns3/svc3/tcp/p80 + add chain ip kube-proxy endpoint-2OCDJSZQ-ns3/svc3/tcp/p80__10.0.3.1/80 + add chain ip kube-proxy endpoint-SWWHDC7X-ns3/svc3/tcp/p80__10.0.3.2/80 + + add chain ip kube-proxy service-LAUZTJTB-ns4/svc4/tcp/p80 + add chain ip kube-proxy endpoint-WAHRBT2B-ns4/svc4/tcp/p80__10.0.4.1/80 `) assertNFTablesTransactionEqual(t, getLine(), expected, nft.Dump()) + // (Ensure the old svc3 chain gets deleted in the next sync.) + ageStaleChains() + // Add an endpoint to a service. eps3update2 := eps3update.DeepCopy() eps3update2.Endpoints = append(eps3update2.Endpoints, discovery.Endpoint{Addresses: []string{"10.0.3.3"}}) @@ -4384,7 +4456,15 @@ func TestSyncProxyRulesRepeated(t *testing.T) { fp.syncProxyRules() expected = baseRules + dedent.Dedent(` - # FIXME + add chain ip kube-proxy service-ULMVA6XW-ns1/svc1/tcp/p80 + add chain ip kube-proxy endpoint-5TPGNJF2-ns1/svc1/tcp/p80__10.0.1.1/80 + + add chain ip kube-proxy service-4AT6LBPK-ns3/svc3/tcp/p80 + add chain ip kube-proxy endpoint-SWWHDC7X-ns3/svc3/tcp/p80__10.0.3.2/80 + add chain ip kube-proxy endpoint-TQ2QKHCZ-ns3/svc3/tcp/p80__10.0.3.3/80 + + add chain ip kube-proxy service-LAUZTJTB-ns4/svc4/tcp/p80 + add chain ip kube-proxy endpoint-WAHRBT2B-ns4/svc4/tcp/p80__10.0.4.1/80 `) assertNFTablesTransactionEqual(t, getLine(), expected, nft.Dump()) @@ -4670,3 +4750,156 @@ func TestLoadBalancerIngressRouteTypeProxy(t *testing.T) { }) } } + +func Test_servicePortChainNameBase(t *testing.T) { + testCases := []struct { + name string + spn proxy.ServicePortName + protocol string + expected string + }{ + { + name: "simple", + spn: proxy.ServicePortName{ + NamespacedName: types.NamespacedName{ + Namespace: "testing", + Name: "service", + }, + Port: "http", + }, + protocol: "tcp", + expected: "P4ZYZVCF-testing/service/tcp/http", + }, + { + name: "different port, different hash", + spn: proxy.ServicePortName{ + NamespacedName: types.NamespacedName{ + Namespace: "testing", + Name: "service", + }, + Port: "https", + }, + protocol: "tcp", + expected: "LZBRENCP-testing/service/tcp/https", + }, + { + name: "max length", + spn: proxy.ServicePortName{ + NamespacedName: types.NamespacedName{ + Namespace: "very-long-namespace-name-abcdefghijklmnopqrstuvwxyz0123456789xx", + Name: "very-long-service-name-why-would-you-even-do-this-i-mean-really", + }, + Port: "port-443-providing-the-hypertext-transmission-protocol-with-tls", + }, + protocol: "sctp", + expected: "KR6NACJP-very-long-namespace-name-abcdefghijklmnopqrstuvwxyz0123456789xx/very-long-service-name-why-would-you-even-do-this-i-mean-really/sctp/port-443-providing-the-hypertext-transmission-protocol-with-tls", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + name := servicePortChainNameBase(&tc.spn, tc.protocol) + if name != tc.expected { + t.Errorf("expected %q, got %q", tc.expected, name) + } + }) + } +} + +func Test_servicePortEndpointChainNameBase(t *testing.T) { + testCases := []struct { + name string + spn proxy.ServicePortName + protocol string + endpoint string + expected string + }{ + { + name: "simple", + spn: proxy.ServicePortName{ + NamespacedName: types.NamespacedName{ + Namespace: "testing", + Name: "service", + }, + Port: "http", + }, + protocol: "tcp", + endpoint: "10.180.0.1:80", + expected: "JO2XBXZR-testing/service/tcp/http__10.180.0.1/80", + }, + { + name: "different endpoint, different hash", + spn: proxy.ServicePortName{ + NamespacedName: types.NamespacedName{ + Namespace: "testing", + Name: "service", + }, + Port: "http", + }, + protocol: "tcp", + endpoint: "10.180.0.2:80", + expected: "5S6H3H22-testing/service/tcp/http__10.180.0.2/80", + }, + { + name: "ipv6", + spn: proxy.ServicePortName{ + NamespacedName: types.NamespacedName{ + Namespace: "testing", + Name: "service", + }, + Port: "http", + }, + protocol: "tcp", + endpoint: "[fd80:abcd:12::a1b2:c3d4:e5f6:9999]:80", + expected: "U7E2ET36-testing/service/tcp/http__fd80.abcd.12..a1b2.c3d4.e5f6.9999/80", + }, + { + name: "max length without truncation", + spn: proxy.ServicePortName{ + NamespacedName: types.NamespacedName{ + Namespace: "very-long-namespace-name-abcdefghijklmnopqrstuvwxyz0123456789xx", + Name: "very-long-service-name-why-would-you-even-do-this-i-mean-really", + }, + Port: "port-443-providing-the-hypertext-transmission-protocol-with-tls", + }, + protocol: "sctp", + endpoint: "[1234:5678:9abc:def0::abc:1234]:443", + expected: "5YS7AFEA-very-long-namespace-name-abcdefghijklmnopqrstuvwxyz0123456789xx/very-long-service-name-why-would-you-even-do-this-i-mean-really/sctp/port-443-providing-the-hypertext-transmission-protocol-with-tls__1234.5678.9abc.def0..abc.1234/443", + }, + { + name: "truncated, 1", + spn: proxy.ServicePortName{ + NamespacedName: types.NamespacedName{ + Namespace: "very-long-namespace-name-abcdefghijklmnopqrstuvwxyz0123456789xx", + Name: "very-long-service-name-why-would-you-even-do-this-i-mean-really", + }, + Port: "port-443-providing-the-hypertext-transmission-protocol-with-tls", + }, + protocol: "sctp", + endpoint: "[1234:5678:9abc:def0::abcd:1234:5678]:443", + expected: "CI6C53Q3-very-long-namespace-name-abcdefghijklmnopqrstuvwxyz0123456789xx/very-long-service-name-why-would-you-even-do-this-i-mean-really/sctp/port-443-providing-the-hypertext-transmission-protocol-with-tls__1234.5678.9abc.def0..abcd.1234...", + }, + { + name: "truncated, 2 (different IP, which is not visible in the result)", + spn: proxy.ServicePortName{ + NamespacedName: types.NamespacedName{ + Namespace: "very-long-namespace-name-abcdefghijklmnopqrstuvwxyz0123456789xx", + Name: "very-long-service-name-why-would-you-even-do-this-i-mean-really", + }, + Port: "port-443-providing-the-hypertext-transmission-protocol-with-tls", + }, + protocol: "sctp", + endpoint: "[1234:5678:9abc:def0::abcd:1234:8765]:443", + expected: "2FLXFK6X-very-long-namespace-name-abcdefghijklmnopqrstuvwxyz0123456789xx/very-long-service-name-why-would-you-even-do-this-i-mean-really/sctp/port-443-providing-the-hypertext-transmission-protocol-with-tls__1234.5678.9abc.def0..abcd.1234...", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + name := servicePortEndpointChainNameBase(&tc.spn, tc.protocol, tc.endpoint) + if name != tc.expected { + t.Errorf("expected %q, got %q", tc.expected, name) + } + }) + } +}