Distinguish iptables-based and nftables-based backends, do startup cleanup

When switching from iptables or ipvs to nftables, clean up old
iptables/ipvs rules. When switching the other way, clean up old
nftables rules.
This commit is contained in:
Dan Winship 2023-10-23 09:38:24 -04:00
parent abb1a458a9
commit 93860a5217
4 changed files with 105 additions and 143 deletions

View File

@ -375,9 +375,12 @@ func (o *Options) Run() error {
return o.writeConfigFile()
}
err := platformCleanup(o.config.Mode, o.CleanupAndExit)
if o.CleanupAndExit {
return cleanupAndExit()
return err
}
// We ignore err otherwise; the cleanup is best-effort, and the backends will have
// logged messages if they failed in interesting ways.
proxyServer, err := newProxyServer(o.config, o.master, o.InitAndExit)
if err != nil {

View File

@ -40,9 +40,11 @@ import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilfeature "k8s.io/apiserver/pkg/util/feature"
clientset "k8s.io/client-go/kubernetes"
toolswatch "k8s.io/client-go/tools/watch"
utilsysctl "k8s.io/component-helpers/node/util/sysctl"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/proxy"
proxyconfigapi "k8s.io/kubernetes/pkg/proxy/apis/config"
"k8s.io/kubernetes/pkg/proxy/iptables"
@ -101,60 +103,73 @@ func (s *ProxyServer) platformSetup() error {
return nil
}
// isIPTablesBased checks whether mode is based on iptables rather than nftables
func isIPTablesBased(mode proxyconfigapi.ProxyMode) bool {
return mode == proxyconfigapi.ProxyModeIPTables || mode == proxyconfigapi.ProxyModeIPVS
}
// getIPTables returns an array of [IPv4, IPv6] utiliptables.Interfaces. If primaryFamily
// is not v1.IPFamilyUnknown then it will also separately return the interface for just
// that family.
func getIPTables(primaryFamily v1.IPFamily) ([2]utiliptables.Interface, utiliptables.Interface) {
execer := exec.New()
// Create iptables handlers for both families. Always ordered as IPv4, IPv6
ipt := [2]utiliptables.Interface{
utiliptables.New(execer, utiliptables.ProtocolIPv4),
utiliptables.New(execer, utiliptables.ProtocolIPv6),
}
var iptInterface utiliptables.Interface
if primaryFamily == v1.IPv4Protocol {
iptInterface = ipt[0]
} else if primaryFamily == v1.IPv6Protocol {
iptInterface = ipt[1]
}
return ipt, iptInterface
}
// platformCheckSupported is called immediately before creating the Proxier, to check
// what IP families are supported (and whether the configuration is usable at all).
func (s *ProxyServer) platformCheckSupported() (ipv4Supported, ipv6Supported, dualStackSupported bool, err error) {
execer := exec.New()
ipt := utiliptables.New(execer, utiliptables.ProtocolIPv4)
ipv4Supported = ipt.Present()
ipt = utiliptables.New(execer, utiliptables.ProtocolIPv6)
ipv6Supported = ipt.Present()
if isIPTablesBased(s.Config.Mode) {
ipt, _ := getIPTables(v1.IPFamilyUnknown)
ipv4Supported = ipt[0].Present()
ipv6Supported = ipt[1].Present()
if !ipv4Supported && !ipv6Supported {
err = fmt.Errorf("iptables is not available on this host")
} else if !ipv4Supported {
klog.InfoS("No iptables support for family", "ipFamily", v1.IPv4Protocol)
} else if !ipv6Supported {
klog.InfoS("No iptables support for family", "ipFamily", v1.IPv6Protocol)
}
} else {
// Assume support for both families.
// FIXME: figure out how to check for kernel IPv6 support using nft
ipv4Supported, ipv6Supported = true, true
}
// The Linux proxies can always support dual-stack if they can support both IPv4
// and IPv6.
dualStackSupported = ipv4Supported && ipv6Supported
if !ipv4Supported && !ipv6Supported {
err = fmt.Errorf("iptables is not available on this host")
} else if !ipv4Supported {
klog.InfoS("No iptables support for family", "ipFamily", v1.IPv4Protocol)
} else if !ipv6Supported {
klog.InfoS("No iptables support for family", "ipFamily", v1.IPv6Protocol)
}
return
}
// createProxier creates the proxy.Provider
func (s *ProxyServer) createProxier(config *proxyconfigapi.KubeProxyConfiguration, dualStack, initOnly bool) (proxy.Provider, error) {
var proxier proxy.Provider
var localDetectors [2]proxyutiliptables.LocalTrafficDetector
var localDetector proxyutiliptables.LocalTrafficDetector
var err error
primaryProtocol := utiliptables.ProtocolIPv4
if s.PrimaryIPFamily == v1.IPv6Protocol {
primaryProtocol = utiliptables.ProtocolIPv6
}
execer := exec.New()
iptInterface := utiliptables.New(execer, primaryProtocol)
var ipt [2]utiliptables.Interface
// Create iptables handlers for both families, one is already created
// Always ordered as IPv4, IPv6
if primaryProtocol == utiliptables.ProtocolIPv4 {
ipt[0] = iptInterface
ipt[1] = utiliptables.New(execer, utiliptables.ProtocolIPv6)
} else {
ipt[0] = utiliptables.New(execer, utiliptables.ProtocolIPv4)
ipt[1] = iptInterface
}
if config.Mode == proxyconfigapi.ProxyModeIPTables {
klog.InfoS("Using iptables Proxier")
if dualStack {
// Always ordered to match []ipt
var localDetectors [2]proxyutiliptables.LocalTrafficDetector
ipt, _ := getIPTables(s.PrimaryIPFamily)
localDetectors, err = getDualStackLocalDetectorTuple(config.DetectLocalMode, config, s.podCIDRs)
if err != nil {
return nil, fmt.Errorf("unable to create proxier: %v", err)
@ -164,7 +179,7 @@ func (s *ProxyServer) createProxier(config *proxyconfigapi.KubeProxyConfiguratio
proxier, err = iptables.NewDualStackProxier(
ipt,
utilsysctl.New(),
execer,
exec.New(),
config.IPTables.SyncPeriod.Duration,
config.IPTables.MinSyncPeriod.Duration,
config.IPTables.MasqueradeAll,
@ -180,7 +195,7 @@ func (s *ProxyServer) createProxier(config *proxyconfigapi.KubeProxyConfiguratio
)
} else {
// Create a single-stack proxier if and only if the node does not support dual-stack (i.e, no iptables support).
var localDetector proxyutiliptables.LocalTrafficDetector
_, iptInterface := getIPTables(s.PrimaryIPFamily)
localDetector, err = getLocalDetector(s.PrimaryIPFamily, config.DetectLocalMode, config, s.podCIDRs)
if err != nil {
return nil, fmt.Errorf("unable to create proxier: %v", err)
@ -191,7 +206,7 @@ func (s *ProxyServer) createProxier(config *proxyconfigapi.KubeProxyConfiguratio
s.PrimaryIPFamily,
iptInterface,
utilsysctl.New(),
execer,
exec.New(),
config.IPTables.SyncPeriod.Duration,
config.IPTables.MinSyncPeriod.Duration,
config.IPTables.MasqueradeAll,
@ -211,6 +226,7 @@ func (s *ProxyServer) createProxier(config *proxyconfigapi.KubeProxyConfiguratio
return nil, fmt.Errorf("unable to create proxier: %v", err)
}
} else if config.Mode == proxyconfigapi.ProxyModeIPVS {
execer := exec.New()
ipsetInterface := utilipset.New(execer)
ipvsInterface := utilipvs.New()
if err := ipvs.CanUseIPVSProxier(ipvsInterface, ipsetInterface, config.IPVS.Scheduler); err != nil {
@ -219,8 +235,9 @@ func (s *ProxyServer) createProxier(config *proxyconfigapi.KubeProxyConfiguratio
klog.InfoS("Using ipvs Proxier")
if dualStack {
ipt, _ := getIPTables(s.PrimaryIPFamily)
// Always ordered to match []ipt
var localDetectors [2]proxyutiliptables.LocalTrafficDetector
localDetectors, err = getDualStackLocalDetectorTuple(config.DetectLocalMode, config, s.podCIDRs)
if err != nil {
return nil, fmt.Errorf("unable to create proxier: %v", err)
@ -251,7 +268,7 @@ func (s *ProxyServer) createProxier(config *proxyconfigapi.KubeProxyConfiguratio
initOnly,
)
} else {
var localDetector proxyutiliptables.LocalTrafficDetector
_, iptInterface := getIPTables(s.PrimaryIPFamily)
localDetector, err = getLocalDetector(s.PrimaryIPFamily, config.DetectLocalMode, config, s.podCIDRs)
if err != nil {
return nil, fmt.Errorf("unable to create proxier: %v", err)
@ -290,8 +307,6 @@ func (s *ProxyServer) createProxier(config *proxyconfigapi.KubeProxyConfiguratio
klog.InfoS("Using nftables Proxier")
if dualStack {
// Always ordered to match []ipt
var localDetectors [2]proxyutiliptables.LocalTrafficDetector
localDetectors, err = getDualStackLocalDetectorTuple(config.DetectLocalMode, config, s.podCIDRs)
if err != nil {
return nil, fmt.Errorf("unable to create proxier: %v", err)
@ -313,8 +328,7 @@ func (s *ProxyServer) createProxier(config *proxyconfigapi.KubeProxyConfiguratio
initOnly,
)
} else {
// Create a single-stack proxier if and only if the node does not support dual-stack (i.e, no iptables support).
var localDetector proxyutiliptables.LocalTrafficDetector
// Create a single-stack proxier if and only if the node does not support dual-stack
localDetector, err = getLocalDetector(s.PrimaryIPFamily, config.DetectLocalMode, config, s.podCIDRs)
if err != nil {
return nil, fmt.Errorf("unable to create proxier: %v", err)
@ -531,28 +545,35 @@ func getDualStackLocalDetectorTuple(mode proxyconfigapi.LocalMode, config *proxy
return localDetectors, nil
}
// cleanupAndExit remove iptables rules and ipset/ipvs rules
func cleanupAndExit() error {
execer := exec.New()
// cleanup IPv6 and IPv4 iptables rules, regardless of current configuration
ipts := []utiliptables.Interface{
utiliptables.New(execer, utiliptables.ProtocolIPv4),
utiliptables.New(execer, utiliptables.ProtocolIPv6),
}
ipsetInterface := utilipset.New(execer)
ipvsInterface := utilipvs.New()
// platformCleanup removes stale kube-proxy rules that can be safely removed. If
// cleanupAndExit is true, it will attempt to remove rules from all known kube-proxy
// modes. If it is false, it will only remove rules that are definitely not in use by the
// currently-configured mode.
func platformCleanup(mode proxyconfigapi.ProxyMode, cleanupAndExit bool) error {
var encounteredError bool
for _, ipt := range ipts {
encounteredError = iptables.CleanupLeftovers(ipt) || encounteredError
encounteredError = ipvs.CleanupLeftovers(ipvsInterface, ipt, ipsetInterface) || encounteredError
encounteredError = nftables.CleanupLeftovers(ipt) || encounteredError
// Clean up iptables and ipvs rules if switching to nftables, or if cleanupAndExit
if !isIPTablesBased(mode) || cleanupAndExit {
ipts, _ := getIPTables(v1.IPFamilyUnknown)
execer := exec.New()
ipsetInterface := utilipset.New(execer)
ipvsInterface := utilipvs.New()
for _, ipt := range ipts {
encounteredError = iptables.CleanupLeftovers(ipt) || encounteredError
encounteredError = ipvs.CleanupLeftovers(ipvsInterface, ipt, ipsetInterface) || encounteredError
}
}
if utilfeature.DefaultFeatureGate.Enabled(features.NFTablesProxyMode) {
// Clean up nftables rules when switching to iptables or ipvs, or if cleanupAndExit
if isIPTablesBased(mode) || cleanupAndExit {
encounteredError = nftables.CleanupLeftovers() || encounteredError
}
}
if encounteredError {
return errors.New("encountered an error while tearing down rules")
}
return nil
}

View File

@ -125,7 +125,10 @@ func (s *ProxyServer) createProxier(config *proxyconfigapi.KubeProxyConfiguratio
return proxier, nil
}
// cleanupAndExit cleans up after a previous proxy run
func cleanupAndExit() error {
return errors.New("--cleanup-and-exit is not implemented on Windows")
// platformCleanup removes stale kube-proxy rules that can be safely removed.
func platformCleanup(mode proxyconfigapi.ProxyMode, cleanupAndExit bool) error {
if cleanupAndExit {
return errors.New("--cleanup-and-exit is not implemented on Windows")
}
return nil
}

View File

@ -22,6 +22,7 @@ package nftables
import (
"bytes"
"context"
"crypto/sha256"
"encoding/base32"
"fmt"
@ -342,90 +343,24 @@ var iptablesJumpChains = []iptablesJumpChain{
{utiliptables.TableNAT, kubePostroutingChain, utiliptables.ChainPostrouting, "kubernetes postrouting rules", nil},
}
// When chains get removed from iptablesJumpChains, add them here so they get cleaned up
// on upgrade.
var iptablesCleanupOnlyChains = []iptablesJumpChain{}
// CleanupLeftovers removes all iptables rules and chains created by the Proxier
// CleanupLeftovers removes all nftables rules and chains created by the Proxier
// It returns true if an error was encountered. Errors are logged.
func CleanupLeftovers(ipt utiliptables.Interface) (encounteredError bool) {
// Unlink our chains
for _, jump := range append(iptablesJumpChains, iptablesCleanupOnlyChains...) {
args := append(jump.extraArgs,
"-m", "comment", "--comment", jump.comment,
"-j", string(jump.dstChain),
)
if err := ipt.DeleteRule(jump.table, jump.srcChain, args...); err != nil {
if !utiliptables.IsNotFoundError(err) {
klog.ErrorS(err, "Error removing pure-iptables proxy rule")
encounteredError = true
}
}
}
func CleanupLeftovers() bool {
var encounteredError bool
// Flush and remove all of our "-t nat" chains.
iptablesData := bytes.NewBuffer(nil)
if err := ipt.SaveInto(utiliptables.TableNAT, iptablesData); err != nil {
klog.ErrorS(err, "Failed to execute iptables-save", "table", utiliptables.TableNAT)
encounteredError = true
} else {
existingNATChains := utiliptables.GetChainsFromTable(iptablesData.Bytes())
natChains := proxyutil.NewLineBuffer()
natRules := proxyutil.NewLineBuffer()
natChains.Write("*nat")
// Start with chains we know we need to remove.
for _, chain := range []utiliptables.Chain{kubeServicesChain, kubeNodePortsChain, kubePostroutingChain} {
if _, found := existingNATChains[chain]; found {
chainString := string(chain)
natChains.Write(utiliptables.MakeChainLine(chain)) // flush
natRules.Write("-X", chainString) // delete
}
for _, family := range []knftables.Family{knftables.IPv4Family, knftables.IPv6Family} {
nft, err := knftables.New(family, kubeProxyTable)
if err == nil {
tx := nft.NewTransaction()
tx.Delete(&knftables.Table{})
err = nft.Run(context.TODO(), tx)
}
// Hunt for service and endpoint chains.
for chain := range existingNATChains {
chainString := string(chain)
if isServiceChainName(chainString) {
natChains.Write(utiliptables.MakeChainLine(chain)) // flush
natRules.Write("-X", chainString) // delete
}
}
natRules.Write("COMMIT")
natLines := append(natChains.Bytes(), natRules.Bytes()...)
// Write it.
err = ipt.Restore(utiliptables.TableNAT, natLines, utiliptables.NoFlushTables, utiliptables.RestoreCounters)
if err != nil {
klog.ErrorS(err, "Failed to execute iptables-restore", "table", utiliptables.TableNAT)
metrics.IptablesRestoreFailuresTotal.Inc()
if err != nil && !knftables.IsNotFound(err) {
klog.ErrorS(err, "Error cleaning up nftables rules")
encounteredError = true
}
}
// Flush and remove all of our "-t filter" chains.
iptablesData.Reset()
if err := ipt.SaveInto(utiliptables.TableFilter, iptablesData); err != nil {
klog.ErrorS(err, "Failed to execute iptables-save", "table", utiliptables.TableFilter)
encounteredError = true
} else {
existingFilterChains := utiliptables.GetChainsFromTable(iptablesData.Bytes())
filterChains := proxyutil.NewLineBuffer()
filterRules := proxyutil.NewLineBuffer()
filterChains.Write("*filter")
for _, chain := range []utiliptables.Chain{kubeServicesChain, kubeExternalServicesChain, kubeForwardChain} {
if _, found := existingFilterChains[chain]; found {
chainString := string(chain)
filterChains.Write(utiliptables.MakeChainLine(chain))
filterRules.Write("-X", chainString)
}
}
filterRules.Write("COMMIT")
filterLines := append(filterChains.Bytes(), filterRules.Bytes()...)
// Write it.
if err := ipt.Restore(utiliptables.TableFilter, filterLines, utiliptables.NoFlushTables, utiliptables.RestoreCounters); err != nil {
klog.ErrorS(err, "Failed to execute iptables-restore", "table", utiliptables.TableFilter)
metrics.IptablesRestoreFailuresTotal.Inc()
encounteredError = true
}
}
return encounteredError
}