pkg/proxy/iptables: track ct state invalid dropped packets

Track packets dropped by proxy which were marked invalid by conntrack
using nfacct netfilter extended accounting infrastructure.

Signed-off-by: Daman Arora <aroradaman@gmail.com>
This commit is contained in:
Daman Arora 2024-04-27 12:25:14 +05:30
parent 6b5291654f
commit 3363ec4ba1
3 changed files with 118 additions and 14 deletions

View File

@ -51,6 +51,7 @@ import (
"k8s.io/kubernetes/pkg/proxy/metaproxier" "k8s.io/kubernetes/pkg/proxy/metaproxier"
"k8s.io/kubernetes/pkg/proxy/metrics" "k8s.io/kubernetes/pkg/proxy/metrics"
proxyutil "k8s.io/kubernetes/pkg/proxy/util" proxyutil "k8s.io/kubernetes/pkg/proxy/util"
"k8s.io/kubernetes/pkg/proxy/util/nfacct"
"k8s.io/kubernetes/pkg/util/async" "k8s.io/kubernetes/pkg/util/async"
utiliptables "k8s.io/kubernetes/pkg/util/iptables" utiliptables "k8s.io/kubernetes/pkg/util/iptables"
utilexec "k8s.io/utils/exec" utilexec "k8s.io/utils/exec"
@ -167,6 +168,7 @@ type Proxier struct {
masqueradeAll bool masqueradeAll bool
masqueradeMark string masqueradeMark string
conntrack conntrack.Interface conntrack conntrack.Interface
nfacct nfacct.Interface
localDetector proxyutil.LocalTrafficDetector localDetector proxyutil.LocalTrafficDetector
hostname string hostname string
nodeIP net.IP nodeIP net.IP
@ -208,6 +210,9 @@ type Proxier struct {
networkInterfacer proxyutil.NetworkInterfacer networkInterfacer proxyutil.NetworkInterfacer
logger klog.Logger logger klog.Logger
// nfAcctCounters can be used to determine if a counter exist in the nfacct subsystem.
nfAcctCounters map[string]bool
} }
// Proxier implements proxy.Provider // Proxier implements proxy.Provider
@ -271,6 +276,10 @@ func NewProxier(ctx context.Context,
logger.V(2).Info("Using iptables mark for masquerade", "mark", masqueradeMark) logger.V(2).Info("Using iptables mark for masquerade", "mark", masqueradeMark)
serviceHealthServer := healthcheck.NewServiceHealthServer(hostname, recorder, nodePortAddresses, healthzServer) serviceHealthServer := healthcheck.NewServiceHealthServer(hostname, recorder, nodePortAddresses, healthzServer)
nfacctRunner, err := nfacct.New()
if err != nil {
logger.Error(err, "Failed to create nfacct runner")
}
proxier := &Proxier{ proxier := &Proxier{
ipFamily: ipFamily, ipFamily: ipFamily,
@ -284,6 +293,7 @@ func NewProxier(ctx context.Context,
masqueradeAll: masqueradeAll, masqueradeAll: masqueradeAll,
masqueradeMark: masqueradeMark, masqueradeMark: masqueradeMark,
conntrack: conntrack.NewExec(exec), conntrack: conntrack.NewExec(exec),
nfacct: nfacctRunner,
localDetector: localDetector, localDetector: localDetector,
hostname: hostname, hostname: hostname,
nodeIP: nodeIP, nodeIP: nodeIP,
@ -302,6 +312,7 @@ func NewProxier(ctx context.Context,
networkInterfacer: proxyutil.RealNetwork{}, networkInterfacer: proxyutil.RealNetwork{},
conntrackTCPLiberal: conntrackTCPLiberal, conntrackTCPLiberal: conntrackTCPLiberal,
logger: logger, logger: logger,
nfAcctCounters: map[string]bool{metrics.IPTablesCTStateInvalidDroppedNFAcctCounter: false},
} }
burstSyncs := 2 burstSyncs := 2
@ -853,6 +864,18 @@ func (proxier *Proxier) syncProxyRules() {
return return
} }
} }
// ensure the nfacct counters
if proxier.nfacct != nil {
for name := range proxier.nfAcctCounters {
if err := proxier.nfacct.Ensure(name); err != nil {
proxier.nfAcctCounters[name] = false
proxier.logger.Error(err, "Failed to create nfacct counter; the corresponding metric will not be updated", "counter", name)
} else {
proxier.nfAcctCounters[name] = true
}
}
}
} }
// //
@ -1454,12 +1477,20 @@ func (proxier *Proxier) syncProxyRules() {
// Ref: https://github.com/kubernetes/kubernetes/issues/74839 // Ref: https://github.com/kubernetes/kubernetes/issues/74839
// Ref: https://github.com/kubernetes/kubernetes/issues/117924 // Ref: https://github.com/kubernetes/kubernetes/issues/117924
if !proxier.conntrackTCPLiberal { if !proxier.conntrackTCPLiberal {
proxier.filterRules.Write( rule := []string{
"-A", string(kubeForwardChain), "-A", string(kubeForwardChain),
"-m", "conntrack", "-m", "conntrack",
"--ctstate", "INVALID", "--ctstate", "INVALID",
}
if proxier.nfAcctCounters[metrics.IPTablesCTStateInvalidDroppedNFAcctCounter] {
rule = append(rule,
"-m", "nfacct", "--nfacct-name", metrics.IPTablesCTStateInvalidDroppedNFAcctCounter,
)
}
rule = append(rule,
"-j", "DROP", "-j", "DROP",
) )
proxier.filterRules.Write(rule)
} }
// If the masqueradeMark has been added then we want to forward that same // If the masqueradeMark has been added then we want to forward that same

View File

@ -2544,22 +2544,47 @@ func TestHealthCheckNodePort(t *testing.T) {
} }
func TestDropInvalidRule(t *testing.T) { func TestDropInvalidRule(t *testing.T) {
for _, tcpLiberal := range []bool{false, true} { kubeForwardChainRules := dedent.Dedent(`
t.Run(fmt.Sprintf("tcpLiberal %t", tcpLiberal), func(t *testing.T) {
ipt := iptablestest.NewFake()
fp := NewFakeProxier(ipt)
fp.conntrackTCPLiberal = tcpLiberal
fp.syncProxyRules()
var expected string
if !tcpLiberal {
expected = "-A KUBE-FORWARD -m conntrack --ctstate INVALID -j DROP"
}
expected += dedent.Dedent(`
-A KUBE-FORWARD -m comment --comment "kubernetes forwarding rules" -m mark --mark 0x4000/0x4000 -j ACCEPT -A KUBE-FORWARD -m comment --comment "kubernetes forwarding rules" -m mark --mark 0x4000/0x4000 -j ACCEPT
-A KUBE-FORWARD -m comment --comment "kubernetes forwarding conntrack rule" -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT -A KUBE-FORWARD -m comment --comment "kubernetes forwarding conntrack rule" -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT
`) `)
testCases := []struct {
nfacctEnsured bool
tcpLiberal bool
dropRule string
nfAcctCounters map[string]bool
}{
{
nfacctEnsured: false,
tcpLiberal: false,
nfAcctCounters: map[string]bool{},
dropRule: "-A KUBE-FORWARD -m conntrack --ctstate INVALID -j DROP",
},
{
nfacctEnsured: true,
tcpLiberal: false,
nfAcctCounters: map[string]bool{
metrics.IPTablesCTStateInvalidDroppedNFAcctCounter: true,
},
dropRule: fmt.Sprintf("-A KUBE-FORWARD -m conntrack --ctstate INVALID -m nfacct --nfacct-name %s -j DROP", metrics.IPTablesCTStateInvalidDroppedNFAcctCounter),
},
{
nfacctEnsured: false,
tcpLiberal: true,
nfAcctCounters: map[string]bool{},
dropRule: "",
},
}
for _, tc := range testCases {
t.Run(fmt.Sprintf("tcpLiberal is %t and nfacctEnsured is %t", tc.tcpLiberal, tc.nfacctEnsured), func(t *testing.T) {
ipt := iptablestest.NewFake()
fp := NewFakeProxier(ipt)
fp.conntrackTCPLiberal = tc.tcpLiberal
fp.nfAcctCounters = tc.nfAcctCounters
fp.syncProxyRules()
expected := tc.dropRule + kubeForwardChainRules
assertIPTablesChainEqual(t, getLine(), utiliptables.TableFilter, kubeForwardChain, expected, fp.iptablesData.String()) assertIPTablesChainEqual(t, getLine(), utiliptables.TableFilter, kubeForwardChain, expected, fp.iptablesData.String())
}) })
} }

View File

@ -22,7 +22,9 @@ import (
"k8s.io/component-base/metrics" "k8s.io/component-base/metrics"
"k8s.io/component-base/metrics/legacyregistry" "k8s.io/component-base/metrics/legacyregistry"
"k8s.io/klog/v2"
kubeproxyconfig "k8s.io/kubernetes/pkg/proxy/apis/config" kubeproxyconfig "k8s.io/kubernetes/pkg/proxy/apis/config"
"k8s.io/kubernetes/pkg/proxy/util/nfacct"
) )
const kubeProxySubsystem = "kubeproxy" const kubeProxySubsystem = "kubeproxy"
@ -139,6 +141,14 @@ var (
}, },
) )
// iptablesCTStateInvalidDroppedPacketsDescription describe the metrics for the number of packets dropped
// by iptables which were marked INVALID by conntrack.
iptablesCTStateInvalidDroppedPacketsDescription = metrics.NewDesc(
"kubeproxy_iptables_ct_state_invalid_dropped_packets_total",
"packets dropped by iptables to work around conntrack problems",
nil, nil, metrics.ALPHA, "")
IPTablesCTStateInvalidDroppedNFAcctCounter = "ct_state_invalid_dropped_pkts"
// IPTablesRestoreFailuresTotal is the number of iptables restore failures that the proxy has // IPTablesRestoreFailuresTotal is the number of iptables restore failures that the proxy has
// seen. // seen.
IPTablesRestoreFailuresTotal = metrics.NewCounter( IPTablesRestoreFailuresTotal = metrics.NewCounter(
@ -279,6 +289,7 @@ func RegisterMetrics(mode kubeproxyconfig.ProxyMode) {
switch mode { switch mode {
case kubeproxyconfig.ProxyModeIPTables: case kubeproxyconfig.ProxyModeIPTables:
legacyregistry.CustomMustRegister(newCTStateInvalidPacketsCollector())
legacyregistry.MustRegister(SyncFullProxyRulesLatency) legacyregistry.MustRegister(SyncFullProxyRulesLatency)
legacyregistry.MustRegister(SyncPartialProxyRulesLatency) legacyregistry.MustRegister(SyncPartialProxyRulesLatency)
legacyregistry.MustRegister(IPTablesRestoreFailuresTotal) legacyregistry.MustRegister(IPTablesRestoreFailuresTotal)
@ -303,3 +314,40 @@ func RegisterMetrics(mode kubeproxyconfig.ProxyMode) {
func SinceInSeconds(start time.Time) float64 { func SinceInSeconds(start time.Time) float64 {
return time.Since(start).Seconds() return time.Since(start).Seconds()
} }
var _ metrics.StableCollector = &ctStateInvalidPacketsCollector{}
func newCTStateInvalidPacketsCollector() *ctStateInvalidPacketsCollector {
client, err := nfacct.New()
if err != nil {
klog.ErrorS(err, "failed to initialize nfacct client")
}
return &ctStateInvalidPacketsCollector{client: client}
}
type ctStateInvalidPacketsCollector struct {
metrics.BaseStableCollector
client nfacct.Interface
}
// DescribeWithStability implements the metrics.StableCollector interface.
func (c *ctStateInvalidPacketsCollector) DescribeWithStability(ch chan<- *metrics.Desc) {
ch <- iptablesCTStateInvalidDroppedPacketsDescription
}
// CollectWithStability implements the metrics.StableCollector interface.
func (c *ctStateInvalidPacketsCollector) CollectWithStability(ch chan<- metrics.Metric) {
if c.client != nil {
counter, err := c.client.Get(IPTablesCTStateInvalidDroppedNFAcctCounter)
if err != nil {
klog.ErrorS(err, "failed to collect nfacct counter")
} else {
metric, err := metrics.NewConstMetric(iptablesCTStateInvalidDroppedPacketsDescription, metrics.CounterValue, float64(counter.Packets))
if err != nil {
klog.ErrorS(err, "failed to create constant metric")
} else {
ch <- metric
}
}
}
}