From 3363ec4ba16651487460bd90fc4bfa6a20b773a6 Mon Sep 17 00:00:00 2001 From: Daman Arora Date: Sat, 27 Apr 2024 12:25:14 +0530 Subject: [PATCH] pkg/proxy/iptables: track ct state invalid dropped packets Track packets dropped by proxy which were marked invalid by conntrack using nfacct netfilter extended accounting infrastructure. Signed-off-by: Daman Arora --- pkg/proxy/iptables/proxier.go | 33 ++++++++++++++++++- pkg/proxy/iptables/proxier_test.go | 51 ++++++++++++++++++++++-------- pkg/proxy/metrics/metrics.go | 48 ++++++++++++++++++++++++++++ 3 files changed, 118 insertions(+), 14 deletions(-) diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index 21346207b5c..b639a4a1f93 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -51,6 +51,7 @@ import ( "k8s.io/kubernetes/pkg/proxy/metaproxier" "k8s.io/kubernetes/pkg/proxy/metrics" proxyutil "k8s.io/kubernetes/pkg/proxy/util" + "k8s.io/kubernetes/pkg/proxy/util/nfacct" "k8s.io/kubernetes/pkg/util/async" utiliptables "k8s.io/kubernetes/pkg/util/iptables" utilexec "k8s.io/utils/exec" @@ -167,6 +168,7 @@ type Proxier struct { masqueradeAll bool masqueradeMark string conntrack conntrack.Interface + nfacct nfacct.Interface localDetector proxyutil.LocalTrafficDetector hostname string nodeIP net.IP @@ -208,6 +210,9 @@ type Proxier struct { networkInterfacer proxyutil.NetworkInterfacer logger klog.Logger + + // nfAcctCounters can be used to determine if a counter exist in the nfacct subsystem. + nfAcctCounters map[string]bool } // Proxier implements proxy.Provider @@ -271,6 +276,10 @@ func NewProxier(ctx context.Context, logger.V(2).Info("Using iptables mark for masquerade", "mark", masqueradeMark) serviceHealthServer := healthcheck.NewServiceHealthServer(hostname, recorder, nodePortAddresses, healthzServer) + nfacctRunner, err := nfacct.New() + if err != nil { + logger.Error(err, "Failed to create nfacct runner") + } proxier := &Proxier{ ipFamily: ipFamily, @@ -284,6 +293,7 @@ func NewProxier(ctx context.Context, masqueradeAll: masqueradeAll, masqueradeMark: masqueradeMark, conntrack: conntrack.NewExec(exec), + nfacct: nfacctRunner, localDetector: localDetector, hostname: hostname, nodeIP: nodeIP, @@ -302,6 +312,7 @@ func NewProxier(ctx context.Context, networkInterfacer: proxyutil.RealNetwork{}, conntrackTCPLiberal: conntrackTCPLiberal, logger: logger, + nfAcctCounters: map[string]bool{metrics.IPTablesCTStateInvalidDroppedNFAcctCounter: false}, } burstSyncs := 2 @@ -853,6 +864,18 @@ func (proxier *Proxier) syncProxyRules() { return } } + + // ensure the nfacct counters + if proxier.nfacct != nil { + for name := range proxier.nfAcctCounters { + if err := proxier.nfacct.Ensure(name); err != nil { + proxier.nfAcctCounters[name] = false + proxier.logger.Error(err, "Failed to create nfacct counter; the corresponding metric will not be updated", "counter", name) + } else { + proxier.nfAcctCounters[name] = true + } + } + } } // @@ -1454,12 +1477,20 @@ func (proxier *Proxier) syncProxyRules() { // Ref: https://github.com/kubernetes/kubernetes/issues/74839 // Ref: https://github.com/kubernetes/kubernetes/issues/117924 if !proxier.conntrackTCPLiberal { - proxier.filterRules.Write( + rule := []string{ "-A", string(kubeForwardChain), "-m", "conntrack", "--ctstate", "INVALID", + } + if proxier.nfAcctCounters[metrics.IPTablesCTStateInvalidDroppedNFAcctCounter] { + rule = append(rule, + "-m", "nfacct", "--nfacct-name", metrics.IPTablesCTStateInvalidDroppedNFAcctCounter, + ) + } + rule = append(rule, "-j", "DROP", ) + proxier.filterRules.Write(rule) } // If the masqueradeMark has been added then we want to forward that same diff --git a/pkg/proxy/iptables/proxier_test.go b/pkg/proxy/iptables/proxier_test.go index f618a88e004..fe1c196e02e 100644 --- a/pkg/proxy/iptables/proxier_test.go +++ b/pkg/proxy/iptables/proxier_test.go @@ -2544,22 +2544,47 @@ func TestHealthCheckNodePort(t *testing.T) { } func TestDropInvalidRule(t *testing.T) { - for _, tcpLiberal := range []bool{false, true} { - t.Run(fmt.Sprintf("tcpLiberal %t", tcpLiberal), func(t *testing.T) { - ipt := iptablestest.NewFake() - fp := NewFakeProxier(ipt) - fp.conntrackTCPLiberal = tcpLiberal - fp.syncProxyRules() - - var expected string - if !tcpLiberal { - expected = "-A KUBE-FORWARD -m conntrack --ctstate INVALID -j DROP" - } - expected += dedent.Dedent(` + kubeForwardChainRules := dedent.Dedent(` -A KUBE-FORWARD -m comment --comment "kubernetes forwarding rules" -m mark --mark 0x4000/0x4000 -j ACCEPT -A KUBE-FORWARD -m comment --comment "kubernetes forwarding conntrack rule" -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT - `) + `) + testCases := []struct { + nfacctEnsured bool + tcpLiberal bool + dropRule string + nfAcctCounters map[string]bool + }{ + { + nfacctEnsured: false, + tcpLiberal: false, + nfAcctCounters: map[string]bool{}, + dropRule: "-A KUBE-FORWARD -m conntrack --ctstate INVALID -j DROP", + }, + { + nfacctEnsured: true, + tcpLiberal: false, + nfAcctCounters: map[string]bool{ + metrics.IPTablesCTStateInvalidDroppedNFAcctCounter: true, + }, + dropRule: fmt.Sprintf("-A KUBE-FORWARD -m conntrack --ctstate INVALID -m nfacct --nfacct-name %s -j DROP", metrics.IPTablesCTStateInvalidDroppedNFAcctCounter), + }, + { + nfacctEnsured: false, + tcpLiberal: true, + nfAcctCounters: map[string]bool{}, + dropRule: "", + }, + } + for _, tc := range testCases { + t.Run(fmt.Sprintf("tcpLiberal is %t and nfacctEnsured is %t", tc.tcpLiberal, tc.nfacctEnsured), func(t *testing.T) { + ipt := iptablestest.NewFake() + fp := NewFakeProxier(ipt) + fp.conntrackTCPLiberal = tc.tcpLiberal + fp.nfAcctCounters = tc.nfAcctCounters + fp.syncProxyRules() + + expected := tc.dropRule + kubeForwardChainRules assertIPTablesChainEqual(t, getLine(), utiliptables.TableFilter, kubeForwardChain, expected, fp.iptablesData.String()) }) } diff --git a/pkg/proxy/metrics/metrics.go b/pkg/proxy/metrics/metrics.go index a40cfb9f564..177b69e3d7b 100644 --- a/pkg/proxy/metrics/metrics.go +++ b/pkg/proxy/metrics/metrics.go @@ -22,7 +22,9 @@ import ( "k8s.io/component-base/metrics" "k8s.io/component-base/metrics/legacyregistry" + "k8s.io/klog/v2" kubeproxyconfig "k8s.io/kubernetes/pkg/proxy/apis/config" + "k8s.io/kubernetes/pkg/proxy/util/nfacct" ) const kubeProxySubsystem = "kubeproxy" @@ -139,6 +141,14 @@ var ( }, ) + // iptablesCTStateInvalidDroppedPacketsDescription describe the metrics for the number of packets dropped + // by iptables which were marked INVALID by conntrack. + iptablesCTStateInvalidDroppedPacketsDescription = metrics.NewDesc( + "kubeproxy_iptables_ct_state_invalid_dropped_packets_total", + "packets dropped by iptables to work around conntrack problems", + nil, nil, metrics.ALPHA, "") + IPTablesCTStateInvalidDroppedNFAcctCounter = "ct_state_invalid_dropped_pkts" + // IPTablesRestoreFailuresTotal is the number of iptables restore failures that the proxy has // seen. IPTablesRestoreFailuresTotal = metrics.NewCounter( @@ -279,6 +289,7 @@ func RegisterMetrics(mode kubeproxyconfig.ProxyMode) { switch mode { case kubeproxyconfig.ProxyModeIPTables: + legacyregistry.CustomMustRegister(newCTStateInvalidPacketsCollector()) legacyregistry.MustRegister(SyncFullProxyRulesLatency) legacyregistry.MustRegister(SyncPartialProxyRulesLatency) legacyregistry.MustRegister(IPTablesRestoreFailuresTotal) @@ -303,3 +314,40 @@ func RegisterMetrics(mode kubeproxyconfig.ProxyMode) { func SinceInSeconds(start time.Time) float64 { return time.Since(start).Seconds() } + +var _ metrics.StableCollector = &ctStateInvalidPacketsCollector{} + +func newCTStateInvalidPacketsCollector() *ctStateInvalidPacketsCollector { + client, err := nfacct.New() + if err != nil { + klog.ErrorS(err, "failed to initialize nfacct client") + } + return &ctStateInvalidPacketsCollector{client: client} +} + +type ctStateInvalidPacketsCollector struct { + metrics.BaseStableCollector + client nfacct.Interface +} + +// DescribeWithStability implements the metrics.StableCollector interface. +func (c *ctStateInvalidPacketsCollector) DescribeWithStability(ch chan<- *metrics.Desc) { + ch <- iptablesCTStateInvalidDroppedPacketsDescription +} + +// CollectWithStability implements the metrics.StableCollector interface. +func (c *ctStateInvalidPacketsCollector) CollectWithStability(ch chan<- metrics.Metric) { + if c.client != nil { + counter, err := c.client.Get(IPTablesCTStateInvalidDroppedNFAcctCounter) + if err != nil { + klog.ErrorS(err, "failed to collect nfacct counter") + } else { + metric, err := metrics.NewConstMetric(iptablesCTStateInvalidDroppedPacketsDescription, metrics.CounterValue, float64(counter.Packets)) + if err != nil { + klog.ErrorS(err, "failed to create constant metric") + } else { + ch <- metric + } + } + } +}