From 2735ad541e28525f86413e4ee965fefa510352c3 Mon Sep 17 00:00:00 2001 From: Dan Winship Date: Fri, 19 May 2023 08:02:46 -0400 Subject: [PATCH] Port table setup/cleanup code to nftables --- pkg/proxy/nftables/proxier.go | 259 ++++++++++++++++------------- pkg/proxy/nftables/proxier_test.go | 73 +++++++- 2 files changed, 215 insertions(+), 117 deletions(-) diff --git a/pkg/proxy/nftables/proxier.go b/pkg/proxy/nftables/proxier.go index 39419c04dd3..00dc5f3b097 100644 --- a/pkg/proxy/nftables/proxier.go +++ b/pkg/proxy/nftables/proxier.go @@ -60,32 +60,27 @@ import ( ) const ( - // the nftables table + // Our nftables table. All of our chains/sets/maps are created inside this table, + // so they don't need any "kube-" or "kube-proxy-" prefix of their own. kubeProxyTable = "kube-proxy" - // the services chain in the filter table - kubeServicesFilterChain = "KUBE-SERVICES" + // service dispatch + kubeServicesChain = "services" + kubeNodePortsChain = "nodeports" - // the services chain in the NAT table - kubeServicesChain = "KUBE-SERVICES" + // handling for services with no endpoints + kubeServicesFilterChain = "services-filter" + kubeExternalServicesChain = "external-services" - // the external services chain - kubeExternalServicesChain = "KUBE-EXTERNAL-SERVICES" + // LoadBalancerSourceRanges handling + kubeFirewallChain = "firewall" - // the nodeports chain - kubeNodePortsChain = "KUBE-NODEPORTS" + // masquerading + kubeMarkMasqChain = "mark-for-masquerade" + kubeMasqueradingChain = "masquerading" - // the kubernetes postrouting chain - kubePostroutingChain = "KUBE-POSTROUTING" - - // kubeMarkMasqChain is the mark-for-masquerade chain - kubeMarkMasqChain = "KUBE-MARK-MASQ" - - // the kubernetes forward chain - kubeForwardChain = "KUBE-FORWARD" - - // kubeProxyFirewallChain is the kube-proxy firewall chain - kubeProxyFirewallChain = "KUBE-PROXY-FIREWALL" + // chain for special filtering rules + kubeForwardChain = "forward" ) const sysctlNFConntrackTCPBeLiberal = "net/netfilter/nf_conntrack_tcp_be_liberal" @@ -320,32 +315,142 @@ func NewDualStackProxier( return metaproxier.NewMetaProxier(ipv4Proxier, ipv6Proxier), nil } -type iptablesJumpChain struct { - table utiliptables.Table - dstChain utiliptables.Chain - srcChain utiliptables.Chain - comment string - extraArgs []string +// nftablesBaseChains lists our "base chains"; those that are directly connected to the +// netfilter hooks (e.g., "postrouting", "input", etc.), as opposed to "regular" chains, +// which are only run when a rule jumps to them. See +// https://wiki.nftables.org/wiki-nftables/index.php/Configuring_chains. +// +// These are set up from setupNFTables() and then not directly referenced by +// syncProxyRules(). +// +// All of our base chains have names that are just "${type}-${hook}". e.g., "nat-prerouting". +type nftablesBaseChain struct { + name string + chainType knftables.BaseChainType + hook knftables.BaseChainHook + priority knftables.BaseChainPriority } -var iptablesJumpChains = []iptablesJumpChain{ - {utiliptables.TableFilter, kubeExternalServicesChain, utiliptables.ChainInput, "kubernetes externally-visible service portals", []string{"-m", "conntrack", "--ctstate", "NEW"}}, - {utiliptables.TableFilter, kubeExternalServicesChain, utiliptables.ChainForward, "kubernetes externally-visible service portals", []string{"-m", "conntrack", "--ctstate", "NEW"}}, - {utiliptables.TableFilter, kubeServicesFilterChain, utiliptables.ChainForward, "kubernetes service portals", []string{"-m", "conntrack", "--ctstate", "NEW"}}, - {utiliptables.TableFilter, kubeServicesFilterChain, utiliptables.ChainOutput, "kubernetes service portals", []string{"-m", "conntrack", "--ctstate", "NEW"}}, - {utiliptables.TableFilter, kubeForwardChain, utiliptables.ChainForward, "kubernetes forwarding rules", nil}, - {utiliptables.TableFilter, kubeProxyFirewallChain, utiliptables.ChainInput, "kubernetes load balancer firewall", []string{"-m", "conntrack", "--ctstate", "NEW"}}, - {utiliptables.TableFilter, kubeProxyFirewallChain, utiliptables.ChainOutput, "kubernetes load balancer firewall", []string{"-m", "conntrack", "--ctstate", "NEW"}}, - {utiliptables.TableFilter, kubeProxyFirewallChain, utiliptables.ChainForward, "kubernetes load balancer firewall", []string{"-m", "conntrack", "--ctstate", "NEW"}}, - {utiliptables.TableNAT, kubeServicesChain, utiliptables.ChainOutput, "kubernetes service portals", nil}, - {utiliptables.TableNAT, kubeServicesChain, utiliptables.ChainPrerouting, "kubernetes service portals", nil}, - {utiliptables.TableNAT, kubePostroutingChain, utiliptables.ChainPostrouting, "kubernetes postrouting rules", nil}, +var nftablesBaseChains = []nftablesBaseChain{ + {"filter-input", knftables.FilterType, knftables.InputHook, knftables.FilterPriority}, + {"filter-forward", knftables.FilterType, knftables.ForwardHook, knftables.FilterPriority}, + {"filter-output", knftables.FilterType, knftables.OutputHook, knftables.FilterPriority}, + {"nat-prerouting", knftables.NATType, knftables.PreroutingHook, knftables.DNATPriority}, + {"nat-output", knftables.NATType, knftables.OutputHook, knftables.DNATPriority}, + {"nat-postrouting", knftables.NATType, knftables.PostroutingHook, knftables.SNATPriority}, +} + +// nftablesJumpChains lists our top-level "regular chains" that are jumped to directly +// from one of the base chains. These are set up from setupNFTables(), and some of them +// are also referenced in syncProxyRules(). +type nftablesJumpChain struct { + dstChain string + srcChain string + extraArgs string +} + +var nftablesJumpChains = []nftablesJumpChain{ + {kubeExternalServicesChain, "filter-input", "ct state new"}, + {kubeExternalServicesChain, "filter-forward", "ct state new"}, + {kubeServicesFilterChain, "filter-forward", "ct state new"}, + {kubeServicesFilterChain, "filter-output", "ct state new"}, + {kubeForwardChain, "filter-forward", ""}, + {kubeFirewallChain, "filter-input", "ct state new"}, + {kubeFirewallChain, "filter-output", "ct state new"}, + {kubeFirewallChain, "filter-forward", "ct state new"}, + + {kubeServicesChain, "nat-output", ""}, + {kubeServicesChain, "nat-prerouting", ""}, + {kubeMasqueradingChain, "nat-postrouting", ""}, +} + +// ensureChain adds commands to tx to ensure that chain exists and doesn't contain +// anything from before this transaction (using createdChains to ensure that we don't +// Flush a chain more than once and lose *new* rules as well.) +func ensureChain(chain string, tx *knftables.Transaction, createdChains sets.Set[string]) { + if createdChains.Has(chain) { + return + } + tx.Add(&knftables.Chain{ + Name: chain, + }) + tx.Flush(&knftables.Chain{ + Name: chain, + }) + createdChains.Insert(chain) } func (proxier *Proxier) setupNFTables(tx *knftables.Transaction) { tx.Add(&knftables.Table{ Comment: ptr.To("rules for kube-proxy"), }) + + // Create and flush base chains + for _, bc := range nftablesBaseChains { + chain := &knftables.Chain{ + Name: bc.name, + Type: ptr.To(bc.chainType), + Hook: ptr.To(bc.hook), + Priority: ptr.To(bc.priority), + } + tx.Add(chain) + tx.Flush(chain) + } + + // Create and flush ordinary chains and add rules jumping to them + createdChains := sets.New[string]() + for _, c := range nftablesJumpChains { + ensureChain(c.dstChain, tx, createdChains) + tx.Add(&knftables.Rule{ + Chain: c.srcChain, + Rule: knftables.Concat( + c.extraArgs, + "jump", c.dstChain, + ), + }) + } + + // Ensure all of our other "top-level" chains exist + for _, chain := range []string{kubeServicesFilterChain, kubeServicesChain, kubeExternalServicesChain, kubeForwardChain, kubeNodePortsChain, kubeFirewallChain, kubeMasqueradingChain, kubeMarkMasqChain} { + ensureChain(chain, tx, createdChains) + } + + // Add the rules in the mark-for-masquerade and masquerading chains + tx.Add(&knftables.Rule{ + Chain: kubeMarkMasqChain, + Rule: knftables.Concat( + "mark", "set", "mark", "or", proxier.masqueradeMark, + ), + }) + + tx.Add(&knftables.Rule{ + Chain: kubeMasqueradingChain, + Rule: knftables.Concat( + "mark", "and", proxier.masqueradeMark, "==", "0", + "return", + ), + }) + tx.Add(&knftables.Rule{ + Chain: kubeMasqueradingChain, + Rule: knftables.Concat( + "mark", "set", "mark", "xor", proxier.masqueradeMark, + ), + }) + tx.Add(&knftables.Rule{ + Chain: kubeMasqueradingChain, + Rule: "masquerade fully-random", + }) + + // Drop the packets in INVALID state, which would potentially cause + // unexpected connection reset if nf_conntrack_tcp_be_liberal is not set. + // Ref: https://github.com/kubernetes/kubernetes/issues/74839 + // Ref: https://github.com/kubernetes/kubernetes/issues/117924 + if !proxier.conntrackTCPLiberal { + tx.Add(&knftables.Rule{ + Chain: kubeForwardChain, + Rule: "ct state invalid drop", + }) + } } // CleanupLeftovers removes all nftables rules and chains created by the Proxier @@ -645,6 +750,10 @@ func (proxier *Proxier) syncProxyRules() { return } + // + // Below this point we will not return until we try to write the nftables rules. + // + // Keep track of how long syncs take. start := time.Now() defer func() { @@ -668,29 +777,6 @@ func (proxier *Proxier) syncProxyRules() { tx := proxier.nftables.NewTransaction() proxier.setupNFTables(tx) - // Ensure that our jump rules (eg from PREROUTING to KUBE-SERVICES) exist. - // We can't do this as part of the iptables-restore because we don't want - // to specify/replace *all* of the rules in PREROUTING, etc. - for _, jump := range iptablesJumpChains { - if _, err := proxier.iptables.EnsureChain(jump.table, jump.dstChain); err != nil { - klog.ErrorS(err, "Failed to ensure chain exists", "table", jump.table, "chain", jump.dstChain) - return - } - args := jump.extraArgs - if jump.comment != "" { - args = append(args, "-m", "comment", "--comment", jump.comment) - } - args = append(args, "-j", string(jump.dstChain)) - if _, err := proxier.iptables.EnsureRule(utiliptables.Prepend, jump.table, jump.srcChain, args...); err != nil { - klog.ErrorS(err, "Failed to ensure chain jumps", "table", jump.table, "srcChain", jump.srcChain, "dstChain", jump.dstChain) - return - } - } - - // - // Below this point we will not return until we try to write the nftables rules. - // - // Reset all buffers used later. // This is to avoid memory reallocations and thus improve performance. proxier.filterChains.Reset() @@ -698,42 +784,6 @@ func (proxier *Proxier) syncProxyRules() { proxier.natChains.Reset() proxier.natRules.Reset() - // Write chain lines for all the "top-level" chains we'll be filling in - for _, chainName := range []utiliptables.Chain{kubeServicesFilterChain, kubeExternalServicesChain, kubeForwardChain, kubeProxyFirewallChain} { - proxier.filterChains.Write(utiliptables.MakeChainLine(chainName)) - } - for _, chainName := range []utiliptables.Chain{kubeServicesChain, kubeNodePortsChain, kubePostroutingChain, kubeMarkMasqChain} { - proxier.natChains.Write(utiliptables.MakeChainLine(chainName)) - } - - // Install the kubernetes-specific postrouting rules. We use a whole chain for - // this so that it is easier to flush and change, for example if the mark - // value should ever change. - - proxier.natRules.Write( - "-A", string(kubePostroutingChain), - "-m", "mark", "!", "--mark", fmt.Sprintf("%s/%s", proxier.masqueradeMark, proxier.masqueradeMark), - "-j", "RETURN", - ) - // Clear the mark to avoid re-masquerading if the packet re-traverses the network stack. - proxier.natRules.Write( - "-A", string(kubePostroutingChain), - "-j", "MARK", "--xor-mark", proxier.masqueradeMark, - ) - proxier.natRules.Write( - "-A", string(kubePostroutingChain), - "-m", "comment", "--comment", `"kubernetes service traffic requiring SNAT"`, - "-j", "MASQUERADE", "--random-fully", - ) - - // Install the kubernetes-specific masquerade mark rule. We use a whole chain for - // this so that it is easier to flush and change, for example if the mark - // value should ever change. - proxier.natRules.Write( - "-A", string(kubeMarkMasqChain), - "-j", "MARK", "--or-mark", proxier.masqueradeMark, - ) - // Accumulate NAT chains to keep. activeNATChains := sets.New[string]() @@ -935,7 +985,7 @@ func (proxier *Proxier) syncProxyRules() { } if usesFWChain { proxier.filterRules.Write( - "-A", string(kubeProxyFirewallChain), + "-A", string(kubeFirewallChain), "-m", "comment", "--comment", fmt.Sprintf(`"%s traffic not accepted by %s"`, svcPortNameString, svcInfo.firewallChainName), "-m", protocol, "-p", protocol, "-d", lbip, @@ -1235,19 +1285,6 @@ func (proxier *Proxier) syncProxyRules() { } } - // Drop the packets in INVALID state, which would potentially cause - // unexpected connection reset if nf_conntrack_tcp_be_liberal is not set. - // Ref: https://github.com/kubernetes/kubernetes/issues/74839 - // Ref: https://github.com/kubernetes/kubernetes/issues/117924 - if !proxier.conntrackTCPLiberal { - proxier.filterRules.Write( - "-A", string(kubeForwardChain), - "-m", "conntrack", - "--ctstate", "INVALID", - "-j", "DROP", - ) - } - metrics.IptablesRulesTotal.WithLabelValues(string(utiliptables.TableFilter)).Set(float64(proxier.filterRules.Lines())) metrics.IptablesRulesTotal.WithLabelValues(string(utiliptables.TableNAT)).Set(float64(proxier.natRules.Lines() - deletedChains)) diff --git a/pkg/proxy/nftables/proxier_test.go b/pkg/proxy/nftables/proxier_test.go index 273af20385b..45c0b805f31 100644 --- a/pkg/proxy/nftables/proxier_test.go +++ b/pkg/proxy/nftables/proxier_test.go @@ -503,6 +503,37 @@ func TestOverallNFTablesRules(t *testing.T) { expected := dedent.Dedent(` add table ip kube-proxy { comment "rules for kube-proxy" ; } + + add chain ip kube-proxy external-services + add chain ip kube-proxy forward + add rule ip kube-proxy forward ct state invalid drop + add chain ip kube-proxy mark-for-masquerade + add rule ip kube-proxy mark-for-masquerade mark set mark or 0x4000 + add chain ip kube-proxy nodeports + add chain ip kube-proxy masquerading + add rule ip kube-proxy masquerading mark and 0x4000 == 0 return + add rule ip kube-proxy masquerading mark set mark xor 0x4000 + add rule ip kube-proxy masquerading masquerade fully-random + add chain ip kube-proxy firewall + add chain ip kube-proxy services + add chain ip kube-proxy services-filter + add chain ip kube-proxy filter-forward { type filter hook forward priority 0 ; } + add rule ip kube-proxy filter-forward ct state new jump external-services + add rule ip kube-proxy filter-forward ct state new jump services-filter + add rule ip kube-proxy filter-forward jump forward + add rule ip kube-proxy filter-forward ct state new jump firewall + add chain ip kube-proxy filter-input { type filter hook input priority 0 ; } + add rule ip kube-proxy filter-input ct state new jump external-services + add rule ip kube-proxy filter-input ct state new jump firewall + add chain ip kube-proxy filter-output { type filter hook output priority 0 ; } + add rule ip kube-proxy filter-output ct state new jump services-filter + add rule ip kube-proxy filter-output ct state new jump firewall + add chain ip kube-proxy nat-output { type nat hook output priority -100 ; } + add rule ip kube-proxy nat-output jump services + add chain ip kube-proxy nat-postrouting { type nat hook postrouting priority 100 ; } + add rule ip kube-proxy nat-postrouting jump masquerading + add chain ip kube-proxy nat-prerouting { type nat hook prerouting priority -100 ; } + add rule ip kube-proxy nat-prerouting jump services `) assertNFTablesTransactionEqual(t, getLine(), expected, nft.Dump()) @@ -930,7 +961,7 @@ func TestNodePorts(t *testing.T) { // node IP to be accepted allowAltNodeIP bool - // expectFirewall is true if we expect KUBE-FIREWALL to be filled in with + // expectFirewall is true if we expect firewall to be filled in with // an anti-martian-packet rule expectFirewall bool }{ @@ -1100,18 +1131,16 @@ func TestNodePorts(t *testing.T) { func TestDropInvalidRule(t *testing.T) { for _, tcpLiberal := range []bool{false, true} { t.Run(fmt.Sprintf("tcpLiberal %t", tcpLiberal), func(t *testing.T) { - _, fp := NewFakeProxier(v1.IPv4Protocol) + nft, fp := NewFakeProxier(v1.IPv4Protocol) fp.conntrackTCPLiberal = tcpLiberal fp.syncProxyRules() - /* FIXME var expected string if !tcpLiberal { - expected = "-A KUBE-FORWARD -m conntrack --ctstate INVALID -j DROP\n" + expected = "ct state invalid drop" } - assertIPTablesChainEqual(t, getLine(), utiliptables.TableFilter, kubeForwardChain, expected, fp.iptablesData.String()) - */ + assertNFTablesChainEqual(t, getLine(), nft, kubeForwardChain, expected) }) } } @@ -4166,6 +4195,38 @@ func TestSyncProxyRulesRepeated(t *testing.T) { baseRules := dedent.Dedent(` add table ip kube-proxy { comment "rules for kube-proxy" ; } + + add chain ip kube-proxy external-services + add chain ip kube-proxy filter-forward { type filter hook forward priority 0 ; } + add chain ip kube-proxy filter-input { type filter hook input priority 0 ; } + add chain ip kube-proxy filter-output { type filter hook output priority 0 ; } + add chain ip kube-proxy firewall + add chain ip kube-proxy forward + add chain ip kube-proxy mark-for-masquerade + add chain ip kube-proxy masquerading + add chain ip kube-proxy nat-output { type nat hook output priority -100 ; } + add chain ip kube-proxy nat-postrouting { type nat hook postrouting priority 100 ; } + add chain ip kube-proxy nat-prerouting { type nat hook prerouting priority -100 ; } + add chain ip kube-proxy nodeports + add chain ip kube-proxy services + add chain ip kube-proxy services-filter + + add rule ip kube-proxy filter-forward ct state new jump external-services + add rule ip kube-proxy filter-forward ct state new jump services-filter + add rule ip kube-proxy filter-forward jump forward + add rule ip kube-proxy filter-forward ct state new jump firewall + add rule ip kube-proxy filter-input ct state new jump external-services + add rule ip kube-proxy filter-input ct state new jump firewall + add rule ip kube-proxy filter-output ct state new jump services-filter + add rule ip kube-proxy filter-output ct state new jump firewall + add rule ip kube-proxy forward ct state invalid drop + add rule ip kube-proxy mark-for-masquerade mark set mark or 0x4000 + add rule ip kube-proxy masquerading mark and 0x4000 == 0 return + add rule ip kube-proxy masquerading mark set mark xor 0x4000 + add rule ip kube-proxy masquerading masquerade fully-random + add rule ip kube-proxy nat-output jump services + add rule ip kube-proxy nat-postrouting jump masquerading + add rule ip kube-proxy nat-prerouting jump services `) // Create initial state