diff --git a/pkg/proxy/nftables/helpers_test.go b/pkg/proxy/nftables/helpers_test.go index 0ca7eca330e..c6b18579606 100644 --- a/pkg/proxy/nftables/helpers_test.go +++ b/pkg/proxy/nftables/helpers_test.go @@ -62,9 +62,9 @@ var objectOrder = map[string]int{ // with per-service rules, we don't know what order syncProxyRules is going to output them // in, but the order doesn't matter anyway. So we sort the rules in those chains. var sortedChains = sets.New( - kubeServicesFilterChain, - kubeExternalServicesChain, kubeFirewallChain, + kubeServicesChain, + kubeNodePortsChain, ) // sortNFTablesTransaction sorts an nftables transaction into a standard order for comparison @@ -236,8 +236,16 @@ var destAddrRegexp = regexp.MustCompile(`^ip6* daddr (!= )?(\S+)`) var destAddrLocalRegexp = regexp.MustCompile(`^fib daddr type local`) var destPortRegexp = regexp.MustCompile(`^(tcp|udp|sctp) dport (\d+)`) +var sourceAddrRegexp = regexp.MustCompile(`^ip6* saddr (!= )?(\S+)`) +var sourceAddrLocalRegexp = regexp.MustCompile(`^fib saddr type local`) + +var endpointVMAPRegexp = regexp.MustCompile(`^numgen random mod \d+ vmap \{(.*)\}$`) +var endpointVMapEntryRegexp = regexp.MustCompile(`\d+ : goto (\S+)`) + +var masqueradeRegexp = regexp.MustCompile(`^jump ` + kubeMarkMasqChain + `$`) var jumpRegexp = regexp.MustCompile(`^(jump|goto) (\S+)$`) var verdictRegexp = regexp.MustCompile(`^(drop|reject)$`) +var dnatRegexp = regexp.MustCompile(`^meta l4proto (tcp|udp|sctp) dnat to (\S+)$`) var ignoredRegexp = regexp.MustCompile(strings.Join( []string{ @@ -251,6 +259,10 @@ var ignoredRegexp = regexp.MustCompile(strings.Join( // Likewise, this rule never matches and thus never drops anything, and so // can be ignored. `^ct state invalid drop$`, + + // We use a bare "continue" rule in the firewall chains as a place to + // attach a comment. + `^continue$`, }, "|", )) @@ -269,6 +281,11 @@ func (tracer *nftablesTracer) runChain(chname, sourceIP, protocol, destIP, destP for rule != "" { rule = strings.TrimLeft(rule, " ") + // Note that the order of (some of) the cases is important. e.g., + // masqueradeRegexp must be checked before jumpRegexp, since + // jumpRegexp would also match masqueradeRegexp but do the wrong + // thing with it. + switch { case destAddrRegexp.MatchString(rule): // `^ip6* daddr (!= )?(\S+)` @@ -302,6 +319,38 @@ func (tracer *nftablesTracer) runChain(chname, sourceIP, protocol, destIP, destP break } + case sourceAddrRegexp.MatchString(rule): + // `^ip6* saddr (!= )?(\S+)` + // Tests whether sourceIP does/doesn't match a literal. + match := sourceAddrRegexp.FindStringSubmatch(rule) + rule = strings.TrimPrefix(rule, match[0]) + not, ip := match[1], match[2] + if !tracer.addressMatches(sourceIP, not, ip) { + rule = "" + break + } + + case sourceAddrLocalRegexp.MatchString(rule): + // `^fib saddr type local` + // Tests whether sourceIP is a local IP. + match := sourceAddrLocalRegexp.FindStringSubmatch(rule) + rule = strings.TrimPrefix(rule, match[0]) + if !tracer.nodeIPs.Has(sourceIP) { + rule = "" + break + } + + case masqueradeRegexp.MatchString(rule): + // `^jump mark-for-masquerade$` + // Mark for masquerade: we just treat the jump rule itself as + // being what creates the mark, rather than trying to handle + // the rules inside that chain and the "masquerading" chain. + match := jumpRegexp.FindStringSubmatch(rule) + rule = strings.TrimPrefix(rule, match[0]) + + tracer.matches = append(tracer.matches, ruleObj.Rule) + tracer.markMasq = true + case jumpRegexp.MatchString(rule): // `^(jump|goto) (\S+)$` // Jumps to another chain. @@ -332,6 +381,35 @@ func (tracer *nftablesTracer) runChain(chname, sourceIP, protocol, destIP, destP tracer.outputs = append(tracer.outputs, strings.ToUpper(verdict)) return true + case dnatRegexp.MatchString(rule): + // `meta l4proto (tcp|udp|sctp) dnat to (\S+)` + // DNAT to an endpoint IP and terminate processing. + match := dnatRegexp.FindStringSubmatch(rule) + destEndpoint := match[2] + + tracer.matches = append(tracer.matches, ruleObj.Rule) + tracer.outputs = append(tracer.outputs, destEndpoint) + return true + + case endpointVMAPRegexp.MatchString(rule): + // `^numgen random mod \d+ vmap \{(.*)\}$` + // Selects a random endpoint and jumps to it. For tracePacket's + // purposes, we jump to *all* of the endpoints. + match := endpointVMAPRegexp.FindStringSubmatch(rule) + elements := match[1] + + for _, match = range endpointVMapEntryRegexp.FindAllStringSubmatch(elements, -1) { + // `\d+ : goto (\S+)` + destChain := match[1] + + tracer.matches = append(tracer.matches, ruleObj.Rule) + // Ignore return value; we know each endpoint has a + // terminating dnat verdict, but we want to gather all + // of the endpoints into tracer.output. + _ = tracer.runChain(destChain, sourceIP, protocol, destIP, destPort) + } + return true + default: tracer.t.Errorf("unmatched rule: %s", ruleObj.Rule) rule = "" @@ -393,10 +471,6 @@ type packetFlowTest struct { func runPacketFlowTests(t *testing.T, line string, nft *knftables.Fake, nodeIPs []string, testCases []packetFlowTest) { for _, tc := range testCases { - if tc.output != "DROP" && tc.output != "REJECT" && tc.output != "" { - t.Logf("Skipping test %s which doesn't work yet", tc.name) - continue - } t.Run(tc.name, func(t *testing.T) { protocol := strings.ToLower(string(tc.protocol)) if protocol == "" { diff --git a/pkg/proxy/nftables/proxier.go b/pkg/proxy/nftables/proxier.go index b95a627dbe6..f84d795610c 100644 --- a/pkg/proxy/nftables/proxier.go +++ b/pkg/proxy/nftables/proxier.go @@ -51,8 +51,6 @@ import ( proxyutil "k8s.io/kubernetes/pkg/proxy/util" proxyutiliptables "k8s.io/kubernetes/pkg/proxy/util/iptables" "k8s.io/kubernetes/pkg/util/async" - utiliptables "k8s.io/kubernetes/pkg/util/iptables" - utiliptablestesting "k8s.io/kubernetes/pkg/util/iptables/testing" utilexec "k8s.io/utils/exec" netutils "k8s.io/utils/net" "k8s.io/utils/ptr" @@ -117,14 +115,17 @@ func newServiceInfo(port *v1.ServicePort, service *v1.Service, bsvcPortInfo *pro type endpointInfo struct { *proxy.BaseEndpointInfo - chainName string + chainName string + affinitySetName string } // returns a new proxy.Endpoint which abstracts a endpointInfo func newEndpointInfo(baseInfo *proxy.BaseEndpointInfo, svcPortName *proxy.ServicePortName) proxy.Endpoint { + chainNameBase := servicePortEndpointChainNameBase(svcPortName, strings.ToLower(string(svcPortName.Protocol)), baseInfo.String()) return &endpointInfo{ BaseEndpointInfo: baseInfo, - chainName: servicePortEndpointChainNamePrefix + servicePortEndpointChainNameBase(svcPortName, strings.ToLower(string(svcPortName.Protocol)), baseInfo.String()), + chainName: servicePortEndpointChainNamePrefix + chainNameBase, + affinitySetName: servicePortEndpointAffinityNamePrefix + chainNameBase, } } @@ -154,7 +155,6 @@ type Proxier struct { syncPeriod time.Duration // These are effectively const and do not need the mutex to be held. - iptables utiliptables.Interface nftables knftables.Interface masqueradeAll bool masqueradeMark string @@ -167,15 +167,6 @@ type Proxier struct { serviceHealthServer healthcheck.ServiceHealthServer healthzServer *healthcheck.ProxierHealthServer - // Since converting probabilities (floats) to strings is expensive - // and we are using only probabilities in the format of 1/n, we are - // precomputing some number of those and cache for future reuse. - precomputedProbabilities []string - - // The following buffers are used to reuse memory and avoid allocations - // that are significantly impacting performance. - natRules proxyutil.LineBuffer - // conntrackTCPLiberal indicates whether the system sets the kernel nf_conntrack_tcp_be_liberal conntrackTCPLiberal bool @@ -244,29 +235,26 @@ func NewProxier(ipFamily v1.IPFamily, } proxier := &Proxier{ - ipFamily: ipFamily, - svcPortMap: make(proxy.ServicePortMap), - serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, ipFamily, recorder, nil), - endpointsMap: make(proxy.EndpointsMap), - endpointsChanges: proxy.NewEndpointsChangeTracker(hostname, newEndpointInfo, ipFamily, recorder, nil), - syncPeriod: syncPeriod, - iptables: utiliptablestesting.NewFake(), - nftables: nft, - masqueradeAll: masqueradeAll, - masqueradeMark: masqueradeMark, - exec: utilexec.New(), - localDetector: localDetector, - hostname: hostname, - nodeIP: nodeIP, - recorder: recorder, - serviceHealthServer: serviceHealthServer, - healthzServer: healthzServer, - precomputedProbabilities: make([]string, 0, 1001), - natRules: proxyutil.NewLineBuffer(), - nodePortAddresses: nodePortAddresses, - networkInterfacer: proxyutil.RealNetwork{}, - conntrackTCPLiberal: conntrackTCPLiberal, - staleChains: make(map[string]time.Time), + ipFamily: ipFamily, + svcPortMap: make(proxy.ServicePortMap), + serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, ipFamily, recorder, nil), + endpointsMap: make(proxy.EndpointsMap), + endpointsChanges: proxy.NewEndpointsChangeTracker(hostname, newEndpointInfo, ipFamily, recorder, nil), + syncPeriod: syncPeriod, + nftables: nft, + masqueradeAll: masqueradeAll, + masqueradeMark: masqueradeMark, + exec: utilexec.New(), + localDetector: localDetector, + hostname: hostname, + nodeIP: nodeIP, + recorder: recorder, + serviceHealthServer: serviceHealthServer, + healthzServer: healthzServer, + nodePortAddresses: nodePortAddresses, + networkInterfacer: proxyutil.RealNetwork{}, + conntrackTCPLiberal: conntrackTCPLiberal, + staleChains: make(map[string]time.Time), } burstSyncs := 2 @@ -470,28 +458,6 @@ func CleanupLeftovers() bool { return encounteredError } -func computeProbability(n int) string { - return fmt.Sprintf("%0.10f", 1.0/float64(n)) -} - -// This assumes proxier.mu is held -func (proxier *Proxier) precomputeProbabilities(numberOfPrecomputed int) { - if len(proxier.precomputedProbabilities) == 0 { - proxier.precomputedProbabilities = append(proxier.precomputedProbabilities, "") - } - for i := len(proxier.precomputedProbabilities); i <= numberOfPrecomputed; i++ { - proxier.precomputedProbabilities = append(proxier.precomputedProbabilities, computeProbability(i)) - } -} - -// This assumes proxier.mu is held -func (proxier *Proxier) probability(n int) string { - if n >= len(proxier.precomputedProbabilities) { - proxier.precomputeProbabilities(n) - } - return proxier.precomputedProbabilities[n] -} - // Sync is called to synchronize the proxier state to nftables as soon as possible. func (proxier *Proxier) Sync() { if proxier.healthzServer != nil { @@ -679,6 +645,7 @@ const ( serviceFirewallChainNamePrefix = "firewall-" serviceExternalChainNamePrefix = "external-" servicePortEndpointChainNamePrefix = "endpoint-" + servicePortEndpointAffinityNamePrefix = "affinity-" ) // hashAndTruncate prefixes name with a hash of itself and then truncates to @@ -772,6 +739,10 @@ func isServiceChainName(chainString string) bool { return strings.Contains(chainString, "/") } +func isAffinitySetName(set string) bool { + return strings.HasPrefix(set, servicePortEndpointAffinityNamePrefix) +} + // This is where all of the nftables calls happen. // This assumes proxier.mu is NOT held func (proxier *Proxier) syncProxyRules() { @@ -842,16 +813,15 @@ func (proxier *Proxier) syncProxyRules() { // We need to use, eg, "ip daddr" for IPv4 but "ip6 daddr" for IPv6 ipX := "ip" + ipvX_addr := "ipv4_addr" //nolint:stylecheck // var name intentionally resembles value if proxier.ipFamily == v1.IPv6Protocol { ipX = "ip6" + ipvX_addr = "ipv6_addr" } - // Reset all buffers used later. - // This is to avoid memory reallocations and thus improve performance. - proxier.natRules.Reset() - - // Accumulate service/endpoint chains to keep. + // Accumulate service/endpoint chains and affinity sets to keep. activeChains := sets.New[string]() + activeAffinitySets := sets.New[string]() // Compute total number of endpoint chains across all services // to get a sense of how big the cluster is. @@ -990,13 +960,14 @@ func (proxier *Proxier) syncProxyRules() { // Capture the clusterIP. if hasInternalEndpoints { - proxier.natRules.Write( - "-A", string(kubeServicesChain), - "-m", "comment", "--comment", fmt.Sprintf(`"%s cluster IP"`, svcPortNameString), - "-m", protocol, "-p", protocol, - "-d", svcInfo.ClusterIP().String(), - "--dport", strconv.Itoa(svcInfo.Port()), - "-j", string(internalTrafficChain)) + tx.Add(&knftables.Rule{ + Chain: kubeServicesChain, + Rule: knftables.Concat( + ipX, "daddr", svcInfo.ClusterIP(), + protocol, "dport", svcInfo.Port(), + "goto", internalTrafficChain, + ), + }) } else { // No endpoints. tx.Add(&knftables.Rule{ @@ -1015,13 +986,14 @@ func (proxier *Proxier) syncProxyRules() { if hasEndpoints { // Send traffic bound for external IPs to the "external // destinations" chain. - proxier.natRules.Write( - "-A", string(kubeServicesChain), - "-m", "comment", "--comment", fmt.Sprintf(`"%s external IP"`, svcPortNameString), - "-m", protocol, "-p", protocol, - "-d", externalIP, - "--dport", strconv.Itoa(svcInfo.Port()), - "-j", string(externalTrafficChain)) + tx.Add(&knftables.Rule{ + Chain: kubeServicesChain, + Rule: knftables.Concat( + ipX, "daddr", externalIP, + protocol, "dport", svcInfo.Port(), + "goto", externalTrafficChain, + ), + }) } if !hasExternalEndpoints { // Either no endpoints at all (REJECT) or no endpoints for @@ -1042,14 +1014,14 @@ func (proxier *Proxier) syncProxyRules() { // Capture load-balancer ingress. for _, lbip := range svcInfo.LoadBalancerVIPStrings() { if hasEndpoints { - proxier.natRules.Write( - "-A", string(kubeServicesChain), - "-m", "comment", "--comment", fmt.Sprintf(`"%s loadbalancer IP"`, svcPortNameString), - "-m", protocol, "-p", protocol, - "-d", lbip, - "--dport", strconv.Itoa(svcInfo.Port()), - "-j", string(loadBalancerTrafficChain)) - + tx.Add(&knftables.Rule{ + Chain: kubeServicesChain, + Rule: knftables.Concat( + ipX, "daddr", lbip, + protocol, "dport", svcInfo.Port(), + "goto", loadBalancerTrafficChain, + ), + }) } if usesFWChain { comment := fmt.Sprintf("%s traffic not accepted by %s", svcPortNameString, svcInfo.firewallChainName) @@ -1087,12 +1059,13 @@ func (proxier *Proxier) syncProxyRules() { // Jump to the external destination chain. For better or for // worse, nodeports are not subect to loadBalancerSourceRanges, // and we can't change that. - proxier.natRules.Write( - "-A", string(kubeNodePortsChain), - "-m", "comment", "--comment", svcPortNameString, - "-m", protocol, "-p", protocol, - "--dport", strconv.Itoa(svcInfo.NodePort()), - "-j", string(externalTrafficChain)) + tx.Add(&knftables.Rule{ + Chain: kubeNodePortsChain, + Rule: knftables.Concat( + protocol, "dport", svcInfo.NodePort(), + "goto", externalTrafficChain, + ), + }) } if !hasExternalEndpoints { // Either no endpoints at all (REJECT) or no endpoints for @@ -1113,27 +1086,29 @@ func (proxier *Proxier) syncProxyRules() { // Set up internal traffic handling. if hasInternalEndpoints { if proxier.masqueradeAll { - proxier.natRules.Write( - "-A", string(internalTrafficChain), - "-m", "comment", "--comment", fmt.Sprintf(`"%s cluster IP"`, svcPortNameString), - "-m", protocol, "-p", protocol, - "-d", svcInfo.ClusterIP().String(), - "--dport", strconv.Itoa(svcInfo.Port()), - "-j", string(kubeMarkMasqChain)) + tx.Add(&knftables.Rule{ + Chain: internalTrafficChain, + Rule: knftables.Concat( + ipX, "daddr", svcInfo.ClusterIP(), + protocol, "dport", svcInfo.Port(), + "jump", kubeMarkMasqChain, + ), + }) } else if proxier.localDetector.IsImplemented() { // This masquerades off-cluster traffic to a service VIP. The // idea is that you can establish a static route for your // Service range, routing to any node, and that node will // bridge into the Service for you. Since that might bounce // off-node, we masquerade here. - proxier.natRules.Write( - "-A", string(internalTrafficChain), - "-m", "comment", "--comment", fmt.Sprintf(`"%s cluster IP"`, svcPortNameString), - "-m", protocol, "-p", protocol, - "-d", svcInfo.ClusterIP().String(), - "--dport", strconv.Itoa(svcInfo.Port()), - proxier.localDetector.IfNotLocal(), - "-j", string(kubeMarkMasqChain)) + tx.Add(&knftables.Rule{ + Chain: internalTrafficChain, + Rule: knftables.Concat( + ipX, "daddr", svcInfo.ClusterIP(), + protocol, "dport", svcInfo.Port(), + proxier.localDetector.IfNotLocalNFT(), + "jump", kubeMarkMasqChain, + ), + }) } } @@ -1145,10 +1120,12 @@ func (proxier *Proxier) syncProxyRules() { if !svcInfo.ExternalPolicyLocal() { // If we are using non-local endpoints we need to masquerade, // in case we cross nodes. - proxier.natRules.Write( - "-A", string(externalTrafficChain), - "-m", "comment", "--comment", fmt.Sprintf(`"masquerade traffic for %s external destinations"`, svcPortNameString), - "-j", string(kubeMarkMasqChain)) + tx.Add(&knftables.Rule{ + Chain: externalTrafficChain, + Rule: knftables.Concat( + "jump", kubeMarkMasqChain, + ), + }) } else { // If we are only using same-node endpoints, we can retain the // source IP in most cases. @@ -1158,37 +1135,49 @@ func (proxier *Proxier) syncProxyRules() { // traffic as a special-case. It is subject to neither // form of traffic policy, which simulates going up-and-out // to an external load-balancer and coming back in. - proxier.natRules.Write( - "-A", string(externalTrafficChain), - "-m", "comment", "--comment", fmt.Sprintf(`"pod traffic for %s external destinations"`, svcPortNameString), - proxier.localDetector.IfLocal(), - "-j", string(clusterPolicyChain)) + tx.Add(&knftables.Rule{ + Chain: externalTrafficChain, + Rule: knftables.Concat( + proxier.localDetector.IfLocalNFT(), + "goto", clusterPolicyChain, + ), + Comment: ptr.To("short-circuit pod traffic"), + }) } // Locally originated traffic (not a pod, but the host node) // still needs masquerade because the LBIP itself is a local // address, so that will be the chosen source IP. - proxier.natRules.Write( - "-A", string(externalTrafficChain), - "-m", "comment", "--comment", fmt.Sprintf(`"masquerade LOCAL traffic for %s external destinations"`, svcPortNameString), - "-m", "addrtype", "--src-type", "LOCAL", - "-j", string(kubeMarkMasqChain)) + tx.Add(&knftables.Rule{ + Chain: externalTrafficChain, + Rule: knftables.Concat( + "fib", "saddr", "type", "local", + "jump", kubeMarkMasqChain, + ), + Comment: ptr.To("masquerade local traffic"), + }) // Redirect all src-type=LOCAL -> external destination to the // policy=cluster chain. This allows traffic originating // from the host to be redirected to the service correctly. - proxier.natRules.Write( - "-A", string(externalTrafficChain), - "-m", "comment", "--comment", fmt.Sprintf(`"route LOCAL traffic for %s external destinations"`, svcPortNameString), - "-m", "addrtype", "--src-type", "LOCAL", - "-j", string(clusterPolicyChain)) + tx.Add(&knftables.Rule{ + Chain: externalTrafficChain, + Rule: knftables.Concat( + "fib", "saddr", "type", "local", + "goto", clusterPolicyChain, + ), + Comment: ptr.To("short-circuit local traffic"), + }) } // Anything else falls thru to the appropriate policy chain. if hasExternalEndpoints { - proxier.natRules.Write( - "-A", string(externalTrafficChain), - "-j", string(externalPolicyChain)) + tx.Add(&knftables.Rule{ + Chain: externalTrafficChain, + Rule: knftables.Concat( + "goto", externalPolicyChain, + ), + }) } } @@ -1203,12 +1192,13 @@ func (proxier *Proxier) syncProxyRules() { // firewall filter based on each source range allowFromNode := false for _, src := range svcInfo.LoadBalancerSourceRanges() { - proxier.natRules.Write( - "-A", string(fwChain), - "-m", "comment", "--comment", fmt.Sprintf(`"%s loadbalancer IP"`, svcPortNameString), - "-s", src, - "-j", string(externalTrafficChain), - ) + tx.Add(&knftables.Rule{ + Chain: fwChain, + Rule: knftables.Concat( + ipX, "saddr", src, + "goto", externalTrafficChain, + ), + }) _, cidr, err := netutils.ParseCIDRSloppy(src) if err != nil { klog.ErrorS(err, "Error parsing CIDR in LoadBalancerSourceRanges, dropping it", "cidr", cidr) @@ -1223,36 +1213,75 @@ func (proxier *Proxier) syncProxyRules() { // need the following rules to allow requests from this node. if allowFromNode { for _, lbip := range svcInfo.LoadBalancerVIPStrings() { - proxier.natRules.Write( - "-A", string(fwChain), - "-m", "comment", "--comment", fmt.Sprintf(`"%s loadbalancer IP"`, svcPortNameString), - "-s", lbip, - "-j", string(externalTrafficChain), - ) + tx.Add(&knftables.Rule{ + Chain: fwChain, + Rule: knftables.Concat( + ipX, "saddr", lbip, + "goto", externalTrafficChain, + ), + }) } } // If the packet was able to reach the end of firewall chain, // then it did not get DNATed, so it will match the // corresponding KUBE-PROXY-FIREWALL rule. - proxier.natRules.Write( - "-A", string(fwChain), - "-m", "comment", "--comment", fmt.Sprintf(`"other traffic to %s will be dropped by KUBE-PROXY-FIREWALL"`, svcPortNameString), - ) + tx.Add(&knftables.Rule{ + Chain: fwChain, + Rule: "continue", + Comment: ptr.To("other traffic will be dropped by firewall"), + }) } - // If Cluster policy is in use, create rules jumping from - // clusterPolicyChain to the clusterEndpoints + if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP { + // Generate the per-endpoint affinity sets + for _, ep := range allLocallyReachableEndpoints { + epInfo, ok := ep.(*endpointInfo) + if !ok { + klog.ErrorS(nil, "Failed to cast endpointsInfo", "endpointsInfo", ep) + continue + } + + // Create a set to store current affinity mappings. As + // with the iptables backend, endpoint affinity is + // recorded for connections from a particular source IP + // (without regard to source port) to a particular + // ServicePort (without regard to which service IP was + // used to reach the service). This may be changed in the + // future. + tx.Add(&knftables.Set{ + Name: epInfo.affinitySetName, + Type: ipvX_addr, + Flags: []knftables.SetFlag{ + // The nft docs say "dynamic" is only + // needed for sets containing stateful + // objects (eg counters), but (at least on + // RHEL8) if we create the set without + // "dynamic", it later gets mutated to + // have it, and then the next attempt to + // tx.Add() it here fails because it looks + // like we're trying to change the flags. + knftables.DynamicFlag, + knftables.TimeoutFlag, + }, + Timeout: ptr.To(time.Duration(svcInfo.StickyMaxAgeSeconds()) * time.Second), + }) + activeAffinitySets.Insert(epInfo.affinitySetName) + } + } + + // If Cluster policy is in use, create the chain and create rules jumping + // from clusterPolicyChain to the clusterEndpoints if usesClusterPolicyChain { - proxier.writeServiceToEndpointRules(svcPortNameString, svcInfo, clusterPolicyChain, clusterEndpoints) + proxier.writeServiceToEndpointRules(tx, svcPortNameString, svcInfo, clusterPolicyChain, clusterEndpoints) } // If Local policy is in use, create rules jumping from localPolicyChain // to the localEndpoints if usesLocalPolicyChain { - proxier.writeServiceToEndpointRules(svcPortNameString, svcInfo, localPolicyChain, localEndpoints) + proxier.writeServiceToEndpointRules(tx, svcPortNameString, svcInfo, localPolicyChain, localEndpoints) } - // Generate the per-endpoint chains. + // Generate the per-endpoint chains and affinity sets for _, ep := range allLocallyReachableEndpoints { epInfo, ok := ep.(*endpointInfo) if !ok { @@ -1263,44 +1292,56 @@ func (proxier *Proxier) syncProxyRules() { endpointChain := epInfo.chainName // Handle traffic that loops back to the originator with SNAT. - proxier.natRules.Write( - "-A", string(endpointChain), - "-m", "comment", "--comment", svcPortNameString, - "-s", epInfo.IP(), - "-j", string(kubeMarkMasqChain), - ) - commentAndAffinityArgs := []string{"-m", "comment", "--comment", svcPortNameString} + tx.Add(&knftables.Rule{ + Chain: endpointChain, + Rule: knftables.Concat( + ipX, "saddr", epInfo.IP(), + "jump", kubeMarkMasqChain, + ), + }) + + // Handle session affinity if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP { - commentAndAffinityArgs = append(commentAndAffinityArgs, "-m", "recent", "--name", string(endpointChain), "--set") + tx.Add(&knftables.Rule{ + Chain: endpointChain, + Rule: knftables.Concat( + "update", "@", epInfo.affinitySetName, + "{", ipX, "saddr", "}", + ), + }) } + // DNAT to final destination. - proxier.natRules.Write( - "-A", string(endpointChain), - commentAndAffinityArgs, - "-m", protocol, "-p", protocol, - "-j", "DNAT", "--to-destination", epInfo.String(), - ) + tx.Add(&knftables.Rule{ + Chain: endpointChain, + Rule: knftables.Concat( + "meta l4proto", protocol, + "dnat to", epInfo.String(), + ), + }) } } // Finally, tail-call to the nodePorts chain. This needs to be after all // other service portal rules. if proxier.nodePortAddresses.MatchAll() { - isIPv6 := proxier.ipFamily == v1.IPv6Protocol - - destinations := []string{"-m", "addrtype", "--dst-type", "LOCAL"} // Block localhost nodePorts - if isIPv6 { - destinations = append(destinations, "!", "-d", "::1/128") + var noLocalhost string + if proxier.ipFamily == v1.IPv6Protocol { + noLocalhost = "ip6 daddr != ::1" } else { - destinations = append(destinations, "!", "-d", "127.0.0.0/8") + noLocalhost = "ip daddr != 127.0.0.0/8" } - proxier.natRules.Write( - "-A", string(kubeServicesChain), - "-m", "comment", "--comment", `"kubernetes service nodeports; NOTE: this must be the last rule in this chain"`, - destinations, - "-j", string(kubeNodePortsChain)) + tx.Add(&knftables.Rule{ + Chain: kubeServicesChain, + Rule: knftables.Concat( + "fib daddr type local", + noLocalhost, + "jump", kubeNodePortsChain, + ), + Comment: ptr.To("kubernetes service nodeports; NOTE: this must be the last rule in this chain"), + }) } else { nodeIPs, err := proxier.nodePortAddresses.GetNodeIPs(proxier.networkInterfacer) if err != nil { @@ -1313,11 +1354,14 @@ func (proxier *Proxier) syncProxyRules() { } // create nodeport rules for each IP one by one - proxier.natRules.Write( - "-A", string(kubeServicesChain), - "-m", "comment", "--comment", `"kubernetes service nodeports; NOTE: this must be the last rule in this chain"`, - "-d", ip.String(), - "-j", string(kubeNodePortsChain)) + tx.Add(&knftables.Rule{ + Chain: kubeServicesChain, + Rule: knftables.Concat( + ipX, "daddr", ip, + "jump", kubeNodePortsChain, + ), + Comment: ptr.To("kubernetes service nodeports; NOTE: this must be the last rule in this chain"), + }) } } @@ -1341,13 +1385,24 @@ func (proxier *Proxier) syncProxyRules() { klog.ErrorS(err, "Failed to list nftables chains: stale chains will not be deleted") } - metrics.IptablesRulesTotal.WithLabelValues(string(utiliptables.TableNAT)).Set(float64(proxier.natRules.Lines())) + // OTOH, we can immediately delete any stale affinity sets + existingSets, err := proxier.nftables.List(context.TODO(), "sets") + if err == nil { + for _, set := range existingSets { + if isAffinitySetName(set) && !activeAffinitySets.Has(set) { + tx.Delete(&knftables.Set{ + Name: set, + }) + } + } + } else if !knftables.IsNotFound(err) { + klog.ErrorS(err, "Failed to list nftables sets: stale affinity sets will not be deleted") + } // Sync rules. klog.V(2).InfoS("Reloading service nftables data", "numServices", len(proxier.svcPortMap), "numEndpoints", totalEndpoints, - "numNATRules", proxier.natRules.Lines(), ) // FIXME @@ -1390,51 +1445,50 @@ func (proxier *Proxier) syncProxyRules() { conntrack.CleanStaleEntries(proxier.ipFamily == v1.IPv6Protocol, proxier.exec, proxier.svcPortMap, serviceUpdateResult, endpointUpdateResult) } -func (proxier *Proxier) writeServiceToEndpointRules(svcPortNameString string, svcInfo proxy.ServicePort, svcChain string, endpoints []proxy.Endpoint) { +func (proxier *Proxier) writeServiceToEndpointRules(tx *knftables.Transaction, svcPortNameString string, svcInfo *servicePortInfo, svcChain string, endpoints []proxy.Endpoint) { // First write session affinity rules, if applicable. if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP { + ipX := "ip" + if proxier.ipFamily == v1.IPv6Protocol { + ipX = "ip6" + } + for _, ep := range endpoints { epInfo, ok := ep.(*endpointInfo) if !ok { continue } - comment := fmt.Sprintf(`"%s -> %s"`, svcPortNameString, epInfo.String()) - proxier.natRules.Write( - "-A", string(svcChain), - "-m", "comment", "--comment", comment, - "-m", "recent", "--name", string(epInfo.chainName), - "--rcheck", "--seconds", strconv.Itoa(svcInfo.StickyMaxAgeSeconds()), "--reap", - "-j", string(epInfo.chainName), - ) + + tx.Add(&knftables.Rule{ + Chain: svcChain, + Rule: knftables.Concat( + ipX, "saddr", "@", epInfo.affinitySetName, + "goto", epInfo.chainName, + ), + }) } } - // Now write loadbalancing rules. - numEndpoints := len(endpoints) + // Now write loadbalancing rule + var elements []string for i, ep := range endpoints { epInfo, ok := ep.(*endpointInfo) if !ok { continue } - comment := fmt.Sprintf(`"%s -> %s"`, svcPortNameString, epInfo.String()) - if i < (numEndpoints - 1) { - // Each rule is a probabilistic match. - proxier.natRules.Write( - "-A", string(svcChain), - "-m", "comment", "--comment", comment, - "-m", "statistic", - "--mode", "random", - "--probability", proxier.probability(numEndpoints-i), - "-j", string(epInfo.chainName), - ) - } else { - // The final (or only if n == 1) rule is a guaranteed match. - proxier.natRules.Write( - "-A", string(svcChain), - "-m", "comment", "--comment", comment, - "-j", string(epInfo.chainName), - ) + elements = append(elements, + strconv.Itoa(i), ":", "goto", epInfo.chainName, + ) + if i != len(endpoints)-1 { + elements = append(elements, ",") } } + tx.Add(&knftables.Rule{ + Chain: svcChain, + Rule: knftables.Concat( + "numgen random mod", len(endpoints), "vmap", + "{", elements, "}", + ), + }) } diff --git a/pkg/proxy/nftables/proxier_test.go b/pkg/proxy/nftables/proxier_test.go index 660fea1f99e..53ae17eaf7f 100644 --- a/pkg/proxy/nftables/proxier_test.go +++ b/pkg/proxy/nftables/proxier_test.go @@ -21,7 +21,6 @@ import ( "net" "reflect" "sort" - "strconv" "strings" "testing" "time" @@ -47,7 +46,6 @@ import ( proxyutiliptables "k8s.io/kubernetes/pkg/proxy/util/iptables" proxyutiltest "k8s.io/kubernetes/pkg/proxy/util/testing" "k8s.io/kubernetes/pkg/util/async" - iptablestest "k8s.io/kubernetes/pkg/util/iptables/testing" "k8s.io/utils/exec" fakeexec "k8s.io/utils/exec/testing" netutils "k8s.io/utils/net" @@ -308,24 +306,21 @@ func NewFakeProxier(ipFamily v1.IPFamily) (*knftables.Fake, *Proxier) { nft := knftables.NewFake(nftablesFamily, kubeProxyTable) p := &Proxier{ - ipFamily: ipFamily, - exec: &fakeexec.FakeExec{}, - svcPortMap: make(proxy.ServicePortMap), - serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, ipFamily, nil, nil), - endpointsMap: make(proxy.EndpointsMap), - endpointsChanges: proxy.NewEndpointsChangeTracker(testHostname, newEndpointInfo, ipFamily, nil, nil), - iptables: iptablestest.NewFake(), - nftables: nft, - masqueradeMark: "0x4000", - localDetector: detectLocal, - hostname: testHostname, - serviceHealthServer: healthcheck.NewFakeServiceHealthServer(), - precomputedProbabilities: make([]string, 0, 1001), - natRules: proxyutil.NewLineBuffer(), - nodeIP: netutils.ParseIPSloppy(testNodeIP), - nodePortAddresses: proxyutil.NewNodePortAddresses(ipFamily, nil), - networkInterfacer: networkInterfacer, - staleChains: make(map[string]time.Time), + ipFamily: ipFamily, + exec: &fakeexec.FakeExec{}, + svcPortMap: make(proxy.ServicePortMap), + serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, ipFamily, nil, nil), + endpointsMap: make(proxy.EndpointsMap), + endpointsChanges: proxy.NewEndpointsChangeTracker(testHostname, newEndpointInfo, ipFamily, nil, nil), + nftables: nft, + masqueradeMark: "0x4000", + localDetector: detectLocal, + hostname: testHostname, + serviceHealthServer: healthcheck.NewFakeServiceHealthServer(), + nodeIP: netutils.ParseIPSloppy(testNodeIP), + nodePortAddresses: proxyutil.NewNodePortAddresses(ipFamily, nil), + networkInterfacer: networkInterfacer, + staleChains: make(map[string]time.Time), } p.setInitialized(true) p.syncRunner = async.NewBoundedFrequencyRunner("test-sync-runner", p.syncProxyRules, 0, time.Minute, 1) @@ -533,36 +528,89 @@ func TestOverallNFTablesRules(t *testing.T) { # svc1 add chain ip kube-proxy service-ULMVA6XW-ns1/svc1/tcp/p80 + add rule ip kube-proxy service-ULMVA6XW-ns1/svc1/tcp/p80 ip daddr 172.30.0.41 tcp dport 80 ip saddr != 10.0.0.0/8 jump mark-for-masquerade + add rule ip kube-proxy service-ULMVA6XW-ns1/svc1/tcp/p80 numgen random mod 1 vmap { 0 : goto endpoint-5OJB2KTY-ns1/svc1/tcp/p80__10.180.0.1/80 } + add chain ip kube-proxy endpoint-5OJB2KTY-ns1/svc1/tcp/p80__10.180.0.1/80 + add rule ip kube-proxy endpoint-5OJB2KTY-ns1/svc1/tcp/p80__10.180.0.1/80 ip saddr 10.180.0.1 jump mark-for-masquerade + add rule ip kube-proxy endpoint-5OJB2KTY-ns1/svc1/tcp/p80__10.180.0.1/80 meta l4proto tcp dnat to 10.180.0.1:80 + + add rule ip kube-proxy services ip daddr 172.30.0.41 tcp dport 80 goto service-ULMVA6XW-ns1/svc1/tcp/p80 # svc2 add chain ip kube-proxy service-42NFTM6N-ns2/svc2/tcp/p80 + add rule ip kube-proxy service-42NFTM6N-ns2/svc2/tcp/p80 ip daddr 172.30.0.42 tcp dport 80 ip saddr != 10.0.0.0/8 jump mark-for-masquerade + add rule ip kube-proxy service-42NFTM6N-ns2/svc2/tcp/p80 numgen random mod 1 vmap { 0 : goto endpoint-SGOXE6O3-ns2/svc2/tcp/p80__10.180.0.2/80 } add chain ip kube-proxy external-42NFTM6N-ns2/svc2/tcp/p80 + add rule ip kube-proxy external-42NFTM6N-ns2/svc2/tcp/p80 ip saddr 10.0.0.0/8 goto service-42NFTM6N-ns2/svc2/tcp/p80 comment "short-circuit pod traffic" + add rule ip kube-proxy external-42NFTM6N-ns2/svc2/tcp/p80 fib saddr type local jump mark-for-masquerade comment "masquerade local traffic" + add rule ip kube-proxy external-42NFTM6N-ns2/svc2/tcp/p80 fib saddr type local goto service-42NFTM6N-ns2/svc2/tcp/p80 comment "short-circuit local traffic" add chain ip kube-proxy endpoint-SGOXE6O3-ns2/svc2/tcp/p80__10.180.0.2/80 + add rule ip kube-proxy endpoint-SGOXE6O3-ns2/svc2/tcp/p80__10.180.0.2/80 ip saddr 10.180.0.2 jump mark-for-masquerade + add rule ip kube-proxy endpoint-SGOXE6O3-ns2/svc2/tcp/p80__10.180.0.2/80 meta l4proto tcp dnat to 10.180.0.2:80 + add rule ip kube-proxy services ip daddr 172.30.0.42 tcp dport 80 goto service-42NFTM6N-ns2/svc2/tcp/p80 + add rule ip kube-proxy services ip daddr 192.168.99.22 tcp dport 80 goto external-42NFTM6N-ns2/svc2/tcp/p80 + add rule ip kube-proxy services ip daddr 1.2.3.4 tcp dport 80 goto external-42NFTM6N-ns2/svc2/tcp/p80 add rule ip kube-proxy external-services ip daddr 192.168.99.22 tcp dport 80 drop comment "ns2/svc2:p80 has no local endpoints" add rule ip kube-proxy external-services ip daddr 1.2.3.4 tcp dport 80 drop comment "ns2/svc2:p80 has no local endpoints" add rule ip kube-proxy external-services fib daddr type local tcp dport 3001 drop comment "ns2/svc2:p80 has no local endpoints" + add rule ip kube-proxy nodeports tcp dport 3001 goto external-42NFTM6N-ns2/svc2/tcp/p80 # svc3 add chain ip kube-proxy service-4AT6LBPK-ns3/svc3/tcp/p80 + add rule ip kube-proxy service-4AT6LBPK-ns3/svc3/tcp/p80 ip daddr 172.30.0.43 tcp dport 80 ip saddr != 10.0.0.0/8 jump mark-for-masquerade + add rule ip kube-proxy service-4AT6LBPK-ns3/svc3/tcp/p80 numgen random mod 1 vmap { 0 : goto endpoint-UEIP74TE-ns3/svc3/tcp/p80__10.180.0.3/80 } add chain ip kube-proxy external-4AT6LBPK-ns3/svc3/tcp/p80 + add rule ip kube-proxy external-4AT6LBPK-ns3/svc3/tcp/p80 jump mark-for-masquerade + add rule ip kube-proxy external-4AT6LBPK-ns3/svc3/tcp/p80 goto service-4AT6LBPK-ns3/svc3/tcp/p80 add chain ip kube-proxy endpoint-UEIP74TE-ns3/svc3/tcp/p80__10.180.0.3/80 + add rule ip kube-proxy endpoint-UEIP74TE-ns3/svc3/tcp/p80__10.180.0.3/80 ip saddr 10.180.0.3 jump mark-for-masquerade + add rule ip kube-proxy endpoint-UEIP74TE-ns3/svc3/tcp/p80__10.180.0.3/80 meta l4proto tcp dnat to 10.180.0.3:80 + add rule ip kube-proxy services ip daddr 172.30.0.43 tcp dport 80 goto service-4AT6LBPK-ns3/svc3/tcp/p80 + add rule ip kube-proxy nodeports tcp dport 3003 goto external-4AT6LBPK-ns3/svc3/tcp/p80 # svc4 add chain ip kube-proxy service-LAUZTJTB-ns4/svc4/tcp/p80 + add rule ip kube-proxy service-LAUZTJTB-ns4/svc4/tcp/p80 ip daddr 172.30.0.44 tcp dport 80 ip saddr != 10.0.0.0/8 jump mark-for-masquerade + add rule ip kube-proxy service-LAUZTJTB-ns4/svc4/tcp/p80 numgen random mod 2 vmap { 0 : goto endpoint-UNZV3OEC-ns4/svc4/tcp/p80__10.180.0.4/80 , 1 : goto endpoint-5RFCDDV7-ns4/svc4/tcp/p80__10.180.0.5/80 } add chain ip kube-proxy external-LAUZTJTB-ns4/svc4/tcp/p80 + add rule ip kube-proxy external-LAUZTJTB-ns4/svc4/tcp/p80 jump mark-for-masquerade + add rule ip kube-proxy external-LAUZTJTB-ns4/svc4/tcp/p80 goto service-LAUZTJTB-ns4/svc4/tcp/p80 add chain ip kube-proxy endpoint-5RFCDDV7-ns4/svc4/tcp/p80__10.180.0.5/80 + add rule ip kube-proxy endpoint-5RFCDDV7-ns4/svc4/tcp/p80__10.180.0.5/80 ip saddr 10.180.0.5 jump mark-for-masquerade + add rule ip kube-proxy endpoint-5RFCDDV7-ns4/svc4/tcp/p80__10.180.0.5/80 meta l4proto tcp dnat to 10.180.0.5:80 add chain ip kube-proxy endpoint-UNZV3OEC-ns4/svc4/tcp/p80__10.180.0.4/80 + add rule ip kube-proxy endpoint-UNZV3OEC-ns4/svc4/tcp/p80__10.180.0.4/80 ip saddr 10.180.0.4 jump mark-for-masquerade + add rule ip kube-proxy endpoint-UNZV3OEC-ns4/svc4/tcp/p80__10.180.0.4/80 meta l4proto tcp dnat to 10.180.0.4:80 + add rule ip kube-proxy services ip daddr 172.30.0.44 tcp dport 80 goto service-LAUZTJTB-ns4/svc4/tcp/p80 + add rule ip kube-proxy services ip daddr 192.168.99.33 tcp dport 80 goto external-LAUZTJTB-ns4/svc4/tcp/p80 # svc5 + add set ip kube-proxy affinity-GTK6MW7G-ns5/svc5/tcp/p80__10.180.0.3/80 { type ipv4_addr ; flags dynamic,timeout ; timeout 10800s ; } add chain ip kube-proxy service-HVFWP5L3-ns5/svc5/tcp/p80 + add rule ip kube-proxy service-HVFWP5L3-ns5/svc5/tcp/p80 ip daddr 172.30.0.45 tcp dport 80 ip saddr != 10.0.0.0/8 jump mark-for-masquerade + add rule ip kube-proxy service-HVFWP5L3-ns5/svc5/tcp/p80 ip saddr @affinity-GTK6MW7G-ns5/svc5/tcp/p80__10.180.0.3/80 goto endpoint-GTK6MW7G-ns5/svc5/tcp/p80__10.180.0.3/80 + add rule ip kube-proxy service-HVFWP5L3-ns5/svc5/tcp/p80 numgen random mod 1 vmap { 0 : goto endpoint-GTK6MW7G-ns5/svc5/tcp/p80__10.180.0.3/80 } add chain ip kube-proxy external-HVFWP5L3-ns5/svc5/tcp/p80 + add rule ip kube-proxy external-HVFWP5L3-ns5/svc5/tcp/p80 jump mark-for-masquerade + add rule ip kube-proxy external-HVFWP5L3-ns5/svc5/tcp/p80 goto service-HVFWP5L3-ns5/svc5/tcp/p80 add chain ip kube-proxy firewall-HVFWP5L3-ns5/svc5/tcp/p80 + add rule ip kube-proxy firewall-HVFWP5L3-ns5/svc5/tcp/p80 ip saddr 203.0.113.0/25 goto external-HVFWP5L3-ns5/svc5/tcp/p80 + add rule ip kube-proxy firewall-HVFWP5L3-ns5/svc5/tcp/p80 continue comment "other traffic will be dropped by firewall" + add chain ip kube-proxy endpoint-GTK6MW7G-ns5/svc5/tcp/p80__10.180.0.3/80 + add rule ip kube-proxy endpoint-GTK6MW7G-ns5/svc5/tcp/p80__10.180.0.3/80 ip saddr 10.180.0.3 jump mark-for-masquerade + add rule ip kube-proxy endpoint-GTK6MW7G-ns5/svc5/tcp/p80__10.180.0.3/80 update @affinity-GTK6MW7G-ns5/svc5/tcp/p80__10.180.0.3/80 { ip saddr } + add rule ip kube-proxy endpoint-GTK6MW7G-ns5/svc5/tcp/p80__10.180.0.3/80 meta l4proto tcp dnat to 10.180.0.3:80 add rule ip kube-proxy firewall ip daddr 5.6.7.8 tcp dport 80 drop comment "ns5/svc5:p80 traffic not accepted by firewall-HVFWP5L3-ns5/svc5/tcp/p80" + add rule ip kube-proxy services ip daddr 172.30.0.45 tcp dport 80 goto service-HVFWP5L3-ns5/svc5/tcp/p80 + add rule ip kube-proxy services ip daddr 5.6.7.8 tcp dport 80 goto firewall-HVFWP5L3-ns5/svc5/tcp/p80 + add rule ip kube-proxy nodeports tcp dport 3002 goto external-HVFWP5L3-ns5/svc5/tcp/p80 # svc6 add rule ip kube-proxy services-filter ip daddr 172.30.0.46 tcp dport 80 reject comment "ns6/svc6:p80 has no endpoints" + + add rule ip kube-proxy services fib daddr type local ip daddr != 127.0.0.0/8 jump nodeports comment "kubernetes service nodeports; NOTE: this must be the last rule in this chain" `) assertNFTablesTransactionEqual(t, getLine(), expected, nft.Dump()) @@ -1409,38 +1457,6 @@ func TestExternalTrafficPolicyCluster(t *testing.T) { }) } -func TestComputeProbability(t *testing.T) { - expectedProbabilities := map[int]string{ - 1: "1.0000000000", - 2: "0.5000000000", - 10: "0.1000000000", - 100: "0.0100000000", - 1000: "0.0010000000", - 10000: "0.0001000000", - 100000: "0.0000100000", - 100001: "0.0000099999", - } - - for num, expected := range expectedProbabilities { - actual := computeProbability(num) - if actual != expected { - t.Errorf("Expected computeProbability(%d) to be %s, got: %s", num, expected, actual) - } - } - - prevProbability := float64(0) - for i := 100000; i > 1; i-- { - currProbability, err := strconv.ParseFloat(computeProbability(i), 64) - if err != nil { - t.Fatalf("Error parsing float probability for %d: %v", i, err) - } - if currProbability <= prevProbability { - t.Fatalf("Probability unexpectedly <= to previous probability for %d: (%0.10f <= %0.10f)", i, currProbability, prevProbability) - } - prevProbability = currProbability - } -} - func makeTestService(namespace, name string, svcFunc func(*v1.Service)) *v1.Service { svc := &v1.Service{ ObjectMeta: metav1.ObjectMeta{ @@ -4319,11 +4335,23 @@ func TestSyncProxyRulesRepeated(t *testing.T) { fp.syncProxyRules() expected := baseRules + dedent.Dedent(` + add rule ip kube-proxy services ip daddr 172.30.0.41 tcp dport 80 goto service-ULMVA6XW-ns1/svc1/tcp/p80 + add rule ip kube-proxy services ip daddr 172.30.0.42 tcp dport 8080 goto service-MHHHYRWA-ns2/svc2/tcp/p8080 + add rule ip kube-proxy services fib daddr type local ip daddr != 127.0.0.0/8 jump nodeports comment "kubernetes service nodeports; NOTE: this must be the last rule in this chain" + add chain ip kube-proxy service-ULMVA6XW-ns1/svc1/tcp/p80 + add rule ip kube-proxy service-ULMVA6XW-ns1/svc1/tcp/p80 ip daddr 172.30.0.41 tcp dport 80 ip saddr != 10.0.0.0/8 jump mark-for-masquerade + add rule ip kube-proxy service-ULMVA6XW-ns1/svc1/tcp/p80 numgen random mod 1 vmap { 0 : goto endpoint-5TPGNJF2-ns1/svc1/tcp/p80__10.0.1.1/80 } add chain ip kube-proxy endpoint-5TPGNJF2-ns1/svc1/tcp/p80__10.0.1.1/80 + add rule ip kube-proxy endpoint-5TPGNJF2-ns1/svc1/tcp/p80__10.0.1.1/80 ip saddr 10.0.1.1 jump mark-for-masquerade + add rule ip kube-proxy endpoint-5TPGNJF2-ns1/svc1/tcp/p80__10.0.1.1/80 meta l4proto tcp dnat to 10.0.1.1:80 add chain ip kube-proxy service-MHHHYRWA-ns2/svc2/tcp/p8080 + add rule ip kube-proxy service-MHHHYRWA-ns2/svc2/tcp/p8080 ip daddr 172.30.0.42 tcp dport 8080 ip saddr != 10.0.0.0/8 jump mark-for-masquerade + add rule ip kube-proxy service-MHHHYRWA-ns2/svc2/tcp/p8080 numgen random mod 1 vmap { 0 : goto endpoint-7RVP4LUQ-ns2/svc2/tcp/p8080__10.0.2.1/8080 } add chain ip kube-proxy endpoint-7RVP4LUQ-ns2/svc2/tcp/p8080__10.0.2.1/8080 + add rule ip kube-proxy endpoint-7RVP4LUQ-ns2/svc2/tcp/p8080__10.0.2.1/8080 ip saddr 10.0.2.1 jump mark-for-masquerade + add rule ip kube-proxy endpoint-7RVP4LUQ-ns2/svc2/tcp/p8080__10.0.2.1/8080 meta l4proto tcp dnat to 10.0.2.1:8080 `) assertNFTablesTransactionEqual(t, getLine(), expected, nft.Dump()) @@ -4357,30 +4385,82 @@ func TestSyncProxyRulesRepeated(t *testing.T) { fp.syncProxyRules() expected = baseRules + dedent.Dedent(` + add rule ip kube-proxy services ip daddr 172.30.0.41 tcp dport 80 goto service-ULMVA6XW-ns1/svc1/tcp/p80 + add rule ip kube-proxy services ip daddr 172.30.0.42 tcp dport 8080 goto service-MHHHYRWA-ns2/svc2/tcp/p8080 + add rule ip kube-proxy services ip daddr 172.30.0.43 tcp dport 80 goto service-4AT6LBPK-ns3/svc3/tcp/p80 + add rule ip kube-proxy services fib daddr type local ip daddr != 127.0.0.0/8 jump nodeports comment "kubernetes service nodeports; NOTE: this must be the last rule in this chain" + add chain ip kube-proxy service-ULMVA6XW-ns1/svc1/tcp/p80 + add rule ip kube-proxy service-ULMVA6XW-ns1/svc1/tcp/p80 ip daddr 172.30.0.41 tcp dport 80 ip saddr != 10.0.0.0/8 jump mark-for-masquerade + add rule ip kube-proxy service-ULMVA6XW-ns1/svc1/tcp/p80 numgen random mod 1 vmap { 0 : goto endpoint-5TPGNJF2-ns1/svc1/tcp/p80__10.0.1.1/80 } add chain ip kube-proxy endpoint-5TPGNJF2-ns1/svc1/tcp/p80__10.0.1.1/80 + add rule ip kube-proxy endpoint-5TPGNJF2-ns1/svc1/tcp/p80__10.0.1.1/80 ip saddr 10.0.1.1 jump mark-for-masquerade + add rule ip kube-proxy endpoint-5TPGNJF2-ns1/svc1/tcp/p80__10.0.1.1/80 meta l4proto tcp dnat to 10.0.1.1:80 add chain ip kube-proxy service-MHHHYRWA-ns2/svc2/tcp/p8080 + add rule ip kube-proxy service-MHHHYRWA-ns2/svc2/tcp/p8080 ip daddr 172.30.0.42 tcp dport 8080 ip saddr != 10.0.0.0/8 jump mark-for-masquerade + add rule ip kube-proxy service-MHHHYRWA-ns2/svc2/tcp/p8080 numgen random mod 1 vmap { 0 : goto endpoint-7RVP4LUQ-ns2/svc2/tcp/p8080__10.0.2.1/8080 } add chain ip kube-proxy endpoint-7RVP4LUQ-ns2/svc2/tcp/p8080__10.0.2.1/8080 + add rule ip kube-proxy endpoint-7RVP4LUQ-ns2/svc2/tcp/p8080__10.0.2.1/8080 ip saddr 10.0.2.1 jump mark-for-masquerade + add rule ip kube-proxy endpoint-7RVP4LUQ-ns2/svc2/tcp/p8080__10.0.2.1/8080 meta l4proto tcp dnat to 10.0.2.1:8080 add chain ip kube-proxy service-4AT6LBPK-ns3/svc3/tcp/p80 + add rule ip kube-proxy service-4AT6LBPK-ns3/svc3/tcp/p80 ip daddr 172.30.0.43 tcp dport 80 ip saddr != 10.0.0.0/8 jump mark-for-masquerade + add rule ip kube-proxy service-4AT6LBPK-ns3/svc3/tcp/p80 numgen random mod 1 vmap { 0 : goto endpoint-2OCDJSZQ-ns3/svc3/tcp/p80__10.0.3.1/80 } add chain ip kube-proxy endpoint-2OCDJSZQ-ns3/svc3/tcp/p80__10.0.3.1/80 + add rule ip kube-proxy endpoint-2OCDJSZQ-ns3/svc3/tcp/p80__10.0.3.1/80 ip saddr 10.0.3.1 jump mark-for-masquerade + add rule ip kube-proxy endpoint-2OCDJSZQ-ns3/svc3/tcp/p80__10.0.3.1/80 meta l4proto tcp dnat to 10.0.3.1:80 `) assertNFTablesTransactionEqual(t, getLine(), expected, nft.Dump()) // Delete a service; its chains will be flushed, but not immediately deleted. fp.OnServiceDelete(svc2) fp.syncProxyRules() + expected = baseRules + dedent.Dedent(` + add rule ip kube-proxy services ip daddr 172.30.0.41 tcp dport 80 goto service-ULMVA6XW-ns1/svc1/tcp/p80 + add rule ip kube-proxy services ip daddr 172.30.0.43 tcp dport 80 goto service-4AT6LBPK-ns3/svc3/tcp/p80 + add rule ip kube-proxy services fib daddr type local ip daddr != 127.0.0.0/8 jump nodeports comment "kubernetes service nodeports; NOTE: this must be the last rule in this chain" + + add chain ip kube-proxy service-ULMVA6XW-ns1/svc1/tcp/p80 + add rule ip kube-proxy service-ULMVA6XW-ns1/svc1/tcp/p80 ip daddr 172.30.0.41 tcp dport 80 ip saddr != 10.0.0.0/8 jump mark-for-masquerade + add rule ip kube-proxy service-ULMVA6XW-ns1/svc1/tcp/p80 numgen random mod 1 vmap { 0 : goto endpoint-5TPGNJF2-ns1/svc1/tcp/p80__10.0.1.1/80 } + add chain ip kube-proxy endpoint-5TPGNJF2-ns1/svc1/tcp/p80__10.0.1.1/80 + add rule ip kube-proxy endpoint-5TPGNJF2-ns1/svc1/tcp/p80__10.0.1.1/80 ip saddr 10.0.1.1 jump mark-for-masquerade + add rule ip kube-proxy endpoint-5TPGNJF2-ns1/svc1/tcp/p80__10.0.1.1/80 meta l4proto tcp dnat to 10.0.1.1:80 + + add chain ip kube-proxy service-MHHHYRWA-ns2/svc2/tcp/p8080 + add chain ip kube-proxy endpoint-7RVP4LUQ-ns2/svc2/tcp/p8080__10.0.2.1/8080 + + add chain ip kube-proxy service-4AT6LBPK-ns3/svc3/tcp/p80 + add rule ip kube-proxy service-4AT6LBPK-ns3/svc3/tcp/p80 ip daddr 172.30.0.43 tcp dport 80 ip saddr != 10.0.0.0/8 jump mark-for-masquerade + add rule ip kube-proxy service-4AT6LBPK-ns3/svc3/tcp/p80 numgen random mod 1 vmap { 0 : goto endpoint-2OCDJSZQ-ns3/svc3/tcp/p80__10.0.3.1/80 } + add chain ip kube-proxy endpoint-2OCDJSZQ-ns3/svc3/tcp/p80__10.0.3.1/80 + add rule ip kube-proxy endpoint-2OCDJSZQ-ns3/svc3/tcp/p80__10.0.3.1/80 ip saddr 10.0.3.1 jump mark-for-masquerade + add rule ip kube-proxy endpoint-2OCDJSZQ-ns3/svc3/tcp/p80__10.0.3.1/80 meta l4proto tcp dnat to 10.0.3.1:80 + `) assertNFTablesTransactionEqual(t, getLine(), expected, nft.Dump()) + // Fake the passage of time and confirm that the stale chains get deleted. ageStaleChains() fp.syncProxyRules() expected = baseRules + dedent.Dedent(` + add rule ip kube-proxy services ip daddr 172.30.0.41 tcp dport 80 goto service-ULMVA6XW-ns1/svc1/tcp/p80 + add rule ip kube-proxy services ip daddr 172.30.0.43 tcp dport 80 goto service-4AT6LBPK-ns3/svc3/tcp/p80 + add rule ip kube-proxy services fib daddr type local ip daddr != 127.0.0.0/8 jump nodeports comment "kubernetes service nodeports; NOTE: this must be the last rule in this chain" + add chain ip kube-proxy service-ULMVA6XW-ns1/svc1/tcp/p80 + add rule ip kube-proxy service-ULMVA6XW-ns1/svc1/tcp/p80 ip daddr 172.30.0.41 tcp dport 80 ip saddr != 10.0.0.0/8 jump mark-for-masquerade + add rule ip kube-proxy service-ULMVA6XW-ns1/svc1/tcp/p80 numgen random mod 1 vmap { 0 : goto endpoint-5TPGNJF2-ns1/svc1/tcp/p80__10.0.1.1/80 } add chain ip kube-proxy endpoint-5TPGNJF2-ns1/svc1/tcp/p80__10.0.1.1/80 + add rule ip kube-proxy endpoint-5TPGNJF2-ns1/svc1/tcp/p80__10.0.1.1/80 ip saddr 10.0.1.1 jump mark-for-masquerade + add rule ip kube-proxy endpoint-5TPGNJF2-ns1/svc1/tcp/p80__10.0.1.1/80 meta l4proto tcp dnat to 10.0.1.1:80 add chain ip kube-proxy service-4AT6LBPK-ns3/svc3/tcp/p80 + add rule ip kube-proxy service-4AT6LBPK-ns3/svc3/tcp/p80 ip daddr 172.30.0.43 tcp dport 80 ip saddr != 10.0.0.0/8 jump mark-for-masquerade + add rule ip kube-proxy service-4AT6LBPK-ns3/svc3/tcp/p80 numgen random mod 1 vmap { 0 : goto endpoint-2OCDJSZQ-ns3/svc3/tcp/p80__10.0.3.1/80 } add chain ip kube-proxy endpoint-2OCDJSZQ-ns3/svc3/tcp/p80__10.0.3.1/80 + add rule ip kube-proxy endpoint-2OCDJSZQ-ns3/svc3/tcp/p80__10.0.3.1/80 ip saddr 10.0.3.1 jump mark-for-masquerade + add rule ip kube-proxy endpoint-2OCDJSZQ-ns3/svc3/tcp/p80__10.0.3.1/80 meta l4proto tcp dnat to 10.0.3.1:80 `) assertNFTablesTransactionEqual(t, getLine(), expected, nft.Dump()) @@ -4398,11 +4478,23 @@ func TestSyncProxyRulesRepeated(t *testing.T) { ) fp.syncProxyRules() expected = baseRules + dedent.Dedent(` + add rule ip kube-proxy services ip daddr 172.30.0.41 tcp dport 80 goto service-ULMVA6XW-ns1/svc1/tcp/p80 + add rule ip kube-proxy services ip daddr 172.30.0.43 tcp dport 80 goto service-4AT6LBPK-ns3/svc3/tcp/p80 + add rule ip kube-proxy services fib daddr type local ip daddr != 127.0.0.0/8 jump nodeports comment "kubernetes service nodeports; NOTE: this must be the last rule in this chain" + add chain ip kube-proxy service-ULMVA6XW-ns1/svc1/tcp/p80 + add rule ip kube-proxy service-ULMVA6XW-ns1/svc1/tcp/p80 ip daddr 172.30.0.41 tcp dport 80 ip saddr != 10.0.0.0/8 jump mark-for-masquerade + add rule ip kube-proxy service-ULMVA6XW-ns1/svc1/tcp/p80 numgen random mod 1 vmap { 0 : goto endpoint-5TPGNJF2-ns1/svc1/tcp/p80__10.0.1.1/80 } add chain ip kube-proxy endpoint-5TPGNJF2-ns1/svc1/tcp/p80__10.0.1.1/80 + add rule ip kube-proxy endpoint-5TPGNJF2-ns1/svc1/tcp/p80__10.0.1.1/80 ip saddr 10.0.1.1 jump mark-for-masquerade + add rule ip kube-proxy endpoint-5TPGNJF2-ns1/svc1/tcp/p80__10.0.1.1/80 meta l4proto tcp dnat to 10.0.1.1:80 add chain ip kube-proxy service-4AT6LBPK-ns3/svc3/tcp/p80 + add rule ip kube-proxy service-4AT6LBPK-ns3/svc3/tcp/p80 ip daddr 172.30.0.43 tcp dport 80 ip saddr != 10.0.0.0/8 jump mark-for-masquerade + add rule ip kube-proxy service-4AT6LBPK-ns3/svc3/tcp/p80 numgen random mod 1 vmap { 0 : goto endpoint-2OCDJSZQ-ns3/svc3/tcp/p80__10.0.3.1/80 } add chain ip kube-proxy endpoint-2OCDJSZQ-ns3/svc3/tcp/p80__10.0.3.1/80 + add rule ip kube-proxy endpoint-2OCDJSZQ-ns3/svc3/tcp/p80__10.0.3.1/80 ip saddr 10.0.3.1 jump mark-for-masquerade + add rule ip kube-proxy endpoint-2OCDJSZQ-ns3/svc3/tcp/p80__10.0.3.1/80 meta l4proto tcp dnat to 10.0.3.1:80 add rule ip kube-proxy services-filter ip daddr 172.30.0.44 tcp dport 80 reject comment "ns4/svc4:p80 has no endpoints" `) @@ -4423,14 +4515,31 @@ func TestSyncProxyRulesRepeated(t *testing.T) { ) fp.syncProxyRules() expected = baseRules + dedent.Dedent(` + add rule ip kube-proxy services ip daddr 172.30.0.41 tcp dport 80 goto service-ULMVA6XW-ns1/svc1/tcp/p80 + add rule ip kube-proxy services ip daddr 172.30.0.43 tcp dport 80 goto service-4AT6LBPK-ns3/svc3/tcp/p80 + add rule ip kube-proxy services ip daddr 172.30.0.44 tcp dport 80 goto service-LAUZTJTB-ns4/svc4/tcp/p80 + add rule ip kube-proxy services fib daddr type local ip daddr != 127.0.0.0/8 jump nodeports comment "kubernetes service nodeports; NOTE: this must be the last rule in this chain" + add chain ip kube-proxy service-ULMVA6XW-ns1/svc1/tcp/p80 + add rule ip kube-proxy service-ULMVA6XW-ns1/svc1/tcp/p80 ip daddr 172.30.0.41 tcp dport 80 ip saddr != 10.0.0.0/8 jump mark-for-masquerade + add rule ip kube-proxy service-ULMVA6XW-ns1/svc1/tcp/p80 numgen random mod 1 vmap { 0 : goto endpoint-5TPGNJF2-ns1/svc1/tcp/p80__10.0.1.1/80 } add chain ip kube-proxy endpoint-5TPGNJF2-ns1/svc1/tcp/p80__10.0.1.1/80 + add rule ip kube-proxy endpoint-5TPGNJF2-ns1/svc1/tcp/p80__10.0.1.1/80 ip saddr 10.0.1.1 jump mark-for-masquerade + add rule ip kube-proxy endpoint-5TPGNJF2-ns1/svc1/tcp/p80__10.0.1.1/80 meta l4proto tcp dnat to 10.0.1.1:80 add chain ip kube-proxy service-4AT6LBPK-ns3/svc3/tcp/p80 + add rule ip kube-proxy service-4AT6LBPK-ns3/svc3/tcp/p80 ip daddr 172.30.0.43 tcp dport 80 ip saddr != 10.0.0.0/8 jump mark-for-masquerade + add rule ip kube-proxy service-4AT6LBPK-ns3/svc3/tcp/p80 numgen random mod 1 vmap { 0 : goto endpoint-2OCDJSZQ-ns3/svc3/tcp/p80__10.0.3.1/80 } add chain ip kube-proxy endpoint-2OCDJSZQ-ns3/svc3/tcp/p80__10.0.3.1/80 + add rule ip kube-proxy endpoint-2OCDJSZQ-ns3/svc3/tcp/p80__10.0.3.1/80 ip saddr 10.0.3.1 jump mark-for-masquerade + add rule ip kube-proxy endpoint-2OCDJSZQ-ns3/svc3/tcp/p80__10.0.3.1/80 meta l4proto tcp dnat to 10.0.3.1:80 add chain ip kube-proxy service-LAUZTJTB-ns4/svc4/tcp/p80 + add rule ip kube-proxy service-LAUZTJTB-ns4/svc4/tcp/p80 ip daddr 172.30.0.44 tcp dport 80 ip saddr != 10.0.0.0/8 jump mark-for-masquerade + add rule ip kube-proxy service-LAUZTJTB-ns4/svc4/tcp/p80 numgen random mod 1 vmap { 0 : goto endpoint-WAHRBT2B-ns4/svc4/tcp/p80__10.0.4.1/80 } add chain ip kube-proxy endpoint-WAHRBT2B-ns4/svc4/tcp/p80__10.0.4.1/80 + add rule ip kube-proxy endpoint-WAHRBT2B-ns4/svc4/tcp/p80__10.0.4.1/80 ip saddr 10.0.4.1 jump mark-for-masquerade + add rule ip kube-proxy endpoint-WAHRBT2B-ns4/svc4/tcp/p80__10.0.4.1/80 meta l4proto tcp dnat to 10.0.4.1:80 `) assertNFTablesTransactionEqual(t, getLine(), expected, nft.Dump()) @@ -4442,15 +4551,32 @@ func TestSyncProxyRulesRepeated(t *testing.T) { // The old endpoint chain (for 10.0.3.1) will not be deleted yet. expected = baseRules + dedent.Dedent(` + add rule ip kube-proxy services ip daddr 172.30.0.41 tcp dport 80 goto service-ULMVA6XW-ns1/svc1/tcp/p80 + add rule ip kube-proxy services ip daddr 172.30.0.43 tcp dport 80 goto service-4AT6LBPK-ns3/svc3/tcp/p80 + add rule ip kube-proxy services ip daddr 172.30.0.44 tcp dport 80 goto service-LAUZTJTB-ns4/svc4/tcp/p80 + add rule ip kube-proxy services fib daddr type local ip daddr != 127.0.0.0/8 jump nodeports comment "kubernetes service nodeports; NOTE: this must be the last rule in this chain" + add chain ip kube-proxy service-ULMVA6XW-ns1/svc1/tcp/p80 + add rule ip kube-proxy service-ULMVA6XW-ns1/svc1/tcp/p80 ip daddr 172.30.0.41 tcp dport 80 ip saddr != 10.0.0.0/8 jump mark-for-masquerade + add rule ip kube-proxy service-ULMVA6XW-ns1/svc1/tcp/p80 numgen random mod 1 vmap { 0 : goto endpoint-5TPGNJF2-ns1/svc1/tcp/p80__10.0.1.1/80 } add chain ip kube-proxy endpoint-5TPGNJF2-ns1/svc1/tcp/p80__10.0.1.1/80 + add rule ip kube-proxy endpoint-5TPGNJF2-ns1/svc1/tcp/p80__10.0.1.1/80 ip saddr 10.0.1.1 jump mark-for-masquerade + add rule ip kube-proxy endpoint-5TPGNJF2-ns1/svc1/tcp/p80__10.0.1.1/80 meta l4proto tcp dnat to 10.0.1.1:80 add chain ip kube-proxy service-4AT6LBPK-ns3/svc3/tcp/p80 + add rule ip kube-proxy service-4AT6LBPK-ns3/svc3/tcp/p80 ip daddr 172.30.0.43 tcp dport 80 ip saddr != 10.0.0.0/8 jump mark-for-masquerade + add rule ip kube-proxy service-4AT6LBPK-ns3/svc3/tcp/p80 numgen random mod 1 vmap { 0 : goto endpoint-SWWHDC7X-ns3/svc3/tcp/p80__10.0.3.2/80 } add chain ip kube-proxy endpoint-2OCDJSZQ-ns3/svc3/tcp/p80__10.0.3.1/80 add chain ip kube-proxy endpoint-SWWHDC7X-ns3/svc3/tcp/p80__10.0.3.2/80 + add rule ip kube-proxy endpoint-SWWHDC7X-ns3/svc3/tcp/p80__10.0.3.2/80 ip saddr 10.0.3.2 jump mark-for-masquerade + add rule ip kube-proxy endpoint-SWWHDC7X-ns3/svc3/tcp/p80__10.0.3.2/80 meta l4proto tcp dnat to 10.0.3.2:80 add chain ip kube-proxy service-LAUZTJTB-ns4/svc4/tcp/p80 + add rule ip kube-proxy service-LAUZTJTB-ns4/svc4/tcp/p80 ip daddr 172.30.0.44 tcp dport 80 ip saddr != 10.0.0.0/8 jump mark-for-masquerade + add rule ip kube-proxy service-LAUZTJTB-ns4/svc4/tcp/p80 numgen random mod 1 vmap { 0 : goto endpoint-WAHRBT2B-ns4/svc4/tcp/p80__10.0.4.1/80 } add chain ip kube-proxy endpoint-WAHRBT2B-ns4/svc4/tcp/p80__10.0.4.1/80 + add rule ip kube-proxy endpoint-WAHRBT2B-ns4/svc4/tcp/p80__10.0.4.1/80 ip saddr 10.0.4.1 jump mark-for-masquerade + add rule ip kube-proxy endpoint-WAHRBT2B-ns4/svc4/tcp/p80__10.0.4.1/80 meta l4proto tcp dnat to 10.0.4.1:80 `) assertNFTablesTransactionEqual(t, getLine(), expected, nft.Dump()) @@ -4464,15 +4590,34 @@ func TestSyncProxyRulesRepeated(t *testing.T) { fp.syncProxyRules() expected = baseRules + dedent.Dedent(` + add rule ip kube-proxy services ip daddr 172.30.0.41 tcp dport 80 goto service-ULMVA6XW-ns1/svc1/tcp/p80 + add rule ip kube-proxy services ip daddr 172.30.0.43 tcp dport 80 goto service-4AT6LBPK-ns3/svc3/tcp/p80 + add rule ip kube-proxy services ip daddr 172.30.0.44 tcp dport 80 goto service-LAUZTJTB-ns4/svc4/tcp/p80 + add rule ip kube-proxy services fib daddr type local ip daddr != 127.0.0.0/8 jump nodeports comment "kubernetes service nodeports; NOTE: this must be the last rule in this chain" + add chain ip kube-proxy service-ULMVA6XW-ns1/svc1/tcp/p80 + add rule ip kube-proxy service-ULMVA6XW-ns1/svc1/tcp/p80 ip daddr 172.30.0.41 tcp dport 80 ip saddr != 10.0.0.0/8 jump mark-for-masquerade + add rule ip kube-proxy service-ULMVA6XW-ns1/svc1/tcp/p80 numgen random mod 1 vmap { 0 : goto endpoint-5TPGNJF2-ns1/svc1/tcp/p80__10.0.1.1/80 } add chain ip kube-proxy endpoint-5TPGNJF2-ns1/svc1/tcp/p80__10.0.1.1/80 + add rule ip kube-proxy endpoint-5TPGNJF2-ns1/svc1/tcp/p80__10.0.1.1/80 ip saddr 10.0.1.1 jump mark-for-masquerade + add rule ip kube-proxy endpoint-5TPGNJF2-ns1/svc1/tcp/p80__10.0.1.1/80 meta l4proto tcp dnat to 10.0.1.1:80 add chain ip kube-proxy service-4AT6LBPK-ns3/svc3/tcp/p80 + add rule ip kube-proxy service-4AT6LBPK-ns3/svc3/tcp/p80 ip daddr 172.30.0.43 tcp dport 80 ip saddr != 10.0.0.0/8 jump mark-for-masquerade + add rule ip kube-proxy service-4AT6LBPK-ns3/svc3/tcp/p80 numgen random mod 2 vmap { 0 : goto endpoint-SWWHDC7X-ns3/svc3/tcp/p80__10.0.3.2/80 , 1 : goto endpoint-TQ2QKHCZ-ns3/svc3/tcp/p80__10.0.3.3/80 } add chain ip kube-proxy endpoint-SWWHDC7X-ns3/svc3/tcp/p80__10.0.3.2/80 + add rule ip kube-proxy endpoint-SWWHDC7X-ns3/svc3/tcp/p80__10.0.3.2/80 ip saddr 10.0.3.2 jump mark-for-masquerade + add rule ip kube-proxy endpoint-SWWHDC7X-ns3/svc3/tcp/p80__10.0.3.2/80 meta l4proto tcp dnat to 10.0.3.2:80 add chain ip kube-proxy endpoint-TQ2QKHCZ-ns3/svc3/tcp/p80__10.0.3.3/80 + add rule ip kube-proxy endpoint-TQ2QKHCZ-ns3/svc3/tcp/p80__10.0.3.3/80 ip saddr 10.0.3.3 jump mark-for-masquerade + add rule ip kube-proxy endpoint-TQ2QKHCZ-ns3/svc3/tcp/p80__10.0.3.3/80 meta l4proto tcp dnat to 10.0.3.3:80 add chain ip kube-proxy service-LAUZTJTB-ns4/svc4/tcp/p80 + add rule ip kube-proxy service-LAUZTJTB-ns4/svc4/tcp/p80 ip daddr 172.30.0.44 tcp dport 80 ip saddr != 10.0.0.0/8 jump mark-for-masquerade + add rule ip kube-proxy service-LAUZTJTB-ns4/svc4/tcp/p80 numgen random mod 1 vmap { 0 : goto endpoint-WAHRBT2B-ns4/svc4/tcp/p80__10.0.4.1/80 } add chain ip kube-proxy endpoint-WAHRBT2B-ns4/svc4/tcp/p80__10.0.4.1/80 + add rule ip kube-proxy endpoint-WAHRBT2B-ns4/svc4/tcp/p80__10.0.4.1/80 ip saddr 10.0.4.1 jump mark-for-masquerade + add rule ip kube-proxy endpoint-WAHRBT2B-ns4/svc4/tcp/p80__10.0.4.1/80 meta l4proto tcp dnat to 10.0.4.1:80 `) assertNFTablesTransactionEqual(t, getLine(), expected, nft.Dump()) @@ -4709,7 +4854,7 @@ func TestLoadBalancerIngressRouteTypeProxy(t *testing.T) { for _, testCase := range testCases { t.Run(testCase.name, func(t *testing.T) { defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.LoadBalancerIPMode, testCase.ipModeEnabled)() - _, fp := NewFakeProxier(v1.IPv4Protocol) + nft, fp := NewFakeProxier(v1.IPv4Protocol) makeServiceMap(fp, makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) { svc.Spec.Type = "LoadBalancer" @@ -4743,18 +4888,17 @@ func TestLoadBalancerIngressRouteTypeProxy(t *testing.T) { fp.syncProxyRules() - /* FIXME - c, _ := ipt.Dump.GetChain(utiliptables.TableNAT, kubeServicesChain) + c := nft.Table.Chains[kubeServicesChain] ruleExists := false + destCheck := fmt.Sprintf("ip daddr %s", testCase.svcLBIP) for _, r := range c.Rules { - if r.DestinationAddress != nil && r.DestinationAddress.Value == testCase.svcLBIP { + if strings.HasPrefix(r.Rule, destCheck) { ruleExists = true } } if ruleExists != testCase.expectedRule { t.Errorf("unexpected rule for %s", testCase.svcLBIP) } - */ }) } } diff --git a/pkg/proxy/util/iptables/traffic.go b/pkg/proxy/util/iptables/traffic.go index f27d89e9a57..6c4f54b0c01 100644 --- a/pkg/proxy/util/iptables/traffic.go +++ b/pkg/proxy/util/iptables/traffic.go @@ -33,6 +33,12 @@ type LocalTrafficDetector interface { // IfNotLocal returns iptables arguments that will match traffic that is not from a pod IfNotLocal() []string + + // IfLocalNFT returns nftables arguments that will match traffic from a pod + IfLocalNFT() []string + + // IfNotLocalNFT returns nftables arguments that will match traffic that is not from a pod + IfNotLocalNFT() []string } type noOpLocalDetector struct{} @@ -54,21 +60,39 @@ func (n *noOpLocalDetector) IfNotLocal() []string { return nil // no-op; matches all traffic } +func (n *noOpLocalDetector) IfLocalNFT() []string { + return nil // no-op; matches all traffic +} + +func (n *noOpLocalDetector) IfNotLocalNFT() []string { + return nil // no-op; matches all traffic +} + type detectLocalByCIDR struct { - ifLocal []string - ifNotLocal []string + ifLocal []string + ifNotLocal []string + ifLocalNFT []string + ifNotLocalNFT []string } // NewDetectLocalByCIDR implements the LocalTrafficDetector interface using a CIDR. This can be used when a single CIDR // range can be used to capture the notion of local traffic. func NewDetectLocalByCIDR(cidr string) (LocalTrafficDetector, error) { - _, _, err := netutils.ParseCIDRSloppy(cidr) + _, parsed, err := netutils.ParseCIDRSloppy(cidr) if err != nil { return nil, err } + + nftFamily := "ip" + if netutils.IsIPv6CIDR(parsed) { + nftFamily = "ip6" + } + return &detectLocalByCIDR{ - ifLocal: []string{"-s", cidr}, - ifNotLocal: []string{"!", "-s", cidr}, + ifLocal: []string{"-s", cidr}, + ifNotLocal: []string{"!", "-s", cidr}, + ifLocalNFT: []string{nftFamily, "saddr", cidr}, + ifNotLocalNFT: []string{nftFamily, "saddr", "!=", cidr}, }, nil } @@ -84,9 +108,19 @@ func (d *detectLocalByCIDR) IfNotLocal() []string { return d.ifNotLocal } +func (d *detectLocalByCIDR) IfLocalNFT() []string { + return d.ifLocalNFT +} + +func (d *detectLocalByCIDR) IfNotLocalNFT() []string { + return d.ifNotLocalNFT +} + type detectLocalByBridgeInterface struct { - ifLocal []string - ifNotLocal []string + ifLocal []string + ifNotLocal []string + ifLocalNFT []string + ifNotLocalNFT []string } // NewDetectLocalByBridgeInterface implements the LocalTrafficDetector interface using a bridge interface name. @@ -96,8 +130,10 @@ func NewDetectLocalByBridgeInterface(interfaceName string) (LocalTrafficDetector return nil, fmt.Errorf("no bridge interface name set") } return &detectLocalByBridgeInterface{ - ifLocal: []string{"-i", interfaceName}, - ifNotLocal: []string{"!", "-i", interfaceName}, + ifLocal: []string{"-i", interfaceName}, + ifNotLocal: []string{"!", "-i", interfaceName}, + ifLocalNFT: []string{"iif", interfaceName}, + ifNotLocalNFT: []string{"iif", "!=", interfaceName}, }, nil } @@ -113,9 +149,19 @@ func (d *detectLocalByBridgeInterface) IfNotLocal() []string { return d.ifNotLocal } +func (d *detectLocalByBridgeInterface) IfLocalNFT() []string { + return d.ifLocalNFT +} + +func (d *detectLocalByBridgeInterface) IfNotLocalNFT() []string { + return d.ifNotLocalNFT +} + type detectLocalByInterfaceNamePrefix struct { - ifLocal []string - ifNotLocal []string + ifLocal []string + ifNotLocal []string + ifLocalNFT []string + ifNotLocalNFT []string } // NewDetectLocalByInterfaceNamePrefix implements the LocalTrafficDetector interface using an interface name prefix. @@ -126,8 +172,10 @@ func NewDetectLocalByInterfaceNamePrefix(interfacePrefix string) (LocalTrafficDe return nil, fmt.Errorf("no interface prefix set") } return &detectLocalByInterfaceNamePrefix{ - ifLocal: []string{"-i", interfacePrefix + "+"}, - ifNotLocal: []string{"!", "-i", interfacePrefix + "+"}, + ifLocal: []string{"-i", interfacePrefix + "+"}, + ifNotLocal: []string{"!", "-i", interfacePrefix + "+"}, + ifLocalNFT: []string{"iif", interfacePrefix + "*"}, + ifNotLocalNFT: []string{"iif", "!=", interfacePrefix + "*"}, }, nil } @@ -142,3 +190,11 @@ func (d *detectLocalByInterfaceNamePrefix) IfLocal() []string { func (d *detectLocalByInterfaceNamePrefix) IfNotLocal() []string { return d.ifNotLocal } + +func (d *detectLocalByInterfaceNamePrefix) IfLocalNFT() []string { + return d.ifLocalNFT +} + +func (d *detectLocalByInterfaceNamePrefix) IfNotLocalNFT() []string { + return d.ifNotLocalNFT +}