From f7fae7297ce322ea3a20970b3617af5a07d40613 Mon Sep 17 00:00:00 2001 From: Daman Arora Date: Tue, 21 May 2024 15:22:43 +0530 Subject: [PATCH 1/5] pkg/proxy/metrics: refactor nfacct metrics collection Signed-off-by: Daman Arora --- pkg/proxy/metrics/metrics.go | 35 +++++++++++++++++++++-------------- 1 file changed, 21 insertions(+), 14 deletions(-) diff --git a/pkg/proxy/metrics/metrics.go b/pkg/proxy/metrics/metrics.go index 177b69e3d7b..41377b666a4 100644 --- a/pkg/proxy/metrics/metrics.go +++ b/pkg/proxy/metrics/metrics.go @@ -147,7 +147,8 @@ var ( "kubeproxy_iptables_ct_state_invalid_dropped_packets_total", "packets dropped by iptables to work around conntrack problems", nil, nil, metrics.ALPHA, "") - IPTablesCTStateInvalidDroppedNFAcctCounter = "ct_state_invalid_dropped_pkts" + IPTablesCTStateInvalidDroppedNFAcctCounter = "ct_state_invalid_dropped_pkts" + iptablesCTStateInvalidDroppedMetricCollector = newNFAcctMetricCollector(IPTablesCTStateInvalidDroppedNFAcctCounter, iptablesCTStateInvalidDroppedPacketsDescription) // IPTablesRestoreFailuresTotal is the number of iptables restore failures that the proxy has // seen. @@ -289,7 +290,7 @@ func RegisterMetrics(mode kubeproxyconfig.ProxyMode) { switch mode { case kubeproxyconfig.ProxyModeIPTables: - legacyregistry.CustomMustRegister(newCTStateInvalidPacketsCollector()) + legacyregistry.CustomMustRegister(iptablesCTStateInvalidDroppedMetricCollector) legacyregistry.MustRegister(SyncFullProxyRulesLatency) legacyregistry.MustRegister(SyncPartialProxyRulesLatency) legacyregistry.MustRegister(IPTablesRestoreFailuresTotal) @@ -315,34 +316,40 @@ func SinceInSeconds(start time.Time) float64 { return time.Since(start).Seconds() } -var _ metrics.StableCollector = &ctStateInvalidPacketsCollector{} +var _ metrics.StableCollector = &nfacctMetricCollector{} -func newCTStateInvalidPacketsCollector() *ctStateInvalidPacketsCollector { +func newNFAcctMetricCollector(counter string, description *metrics.Desc) *nfacctMetricCollector { client, err := nfacct.New() if err != nil { klog.ErrorS(err, "failed to initialize nfacct client") } - return &ctStateInvalidPacketsCollector{client: client} + return &nfacctMetricCollector{ + client: client, + counter: counter, + description: description, + } } -type ctStateInvalidPacketsCollector struct { +type nfacctMetricCollector struct { metrics.BaseStableCollector - client nfacct.Interface + client nfacct.Interface + counter string + description *metrics.Desc } // DescribeWithStability implements the metrics.StableCollector interface. -func (c *ctStateInvalidPacketsCollector) DescribeWithStability(ch chan<- *metrics.Desc) { - ch <- iptablesCTStateInvalidDroppedPacketsDescription +func (n *nfacctMetricCollector) DescribeWithStability(ch chan<- *metrics.Desc) { + ch <- n.description } // CollectWithStability implements the metrics.StableCollector interface. -func (c *ctStateInvalidPacketsCollector) CollectWithStability(ch chan<- metrics.Metric) { - if c.client != nil { - counter, err := c.client.Get(IPTablesCTStateInvalidDroppedNFAcctCounter) +func (n *nfacctMetricCollector) CollectWithStability(ch chan<- metrics.Metric) { + if n.client != nil { + counter, err := n.client.Get(n.counter) if err != nil { - klog.ErrorS(err, "failed to collect nfacct counter") + klog.ErrorS(err, "failed to collect nfacct counter", "counter", n.counter) } else { - metric, err := metrics.NewConstMetric(iptablesCTStateInvalidDroppedPacketsDescription, metrics.CounterValue, float64(counter.Packets)) + metric, err := metrics.NewConstMetric(n.description, metrics.CounterValue, float64(counter.Packets)) if err != nil { klog.ErrorS(err, "failed to create constant metric") } else { From 985d64cdbe799a3462407e73d3ea33071d64d0ab Mon Sep 17 00:00:00 2001 From: Daman Arora Date: Tue, 21 May 2024 19:49:47 +0530 Subject: [PATCH 2/5] add ct_state_invalid_dropped_pkts nfacct counter to unit tests Signed-off-by: Daman Arora --- pkg/proxy/iptables/proxier_test.go | 23 +++++++++++++---------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/pkg/proxy/iptables/proxier_test.go b/pkg/proxy/iptables/proxier_test.go index fe1c196e02e..2f3ac06b857 100644 --- a/pkg/proxy/iptables/proxier_test.go +++ b/pkg/proxy/iptables/proxier_test.go @@ -135,6 +135,9 @@ func NewFakeProxier(ipt utiliptables.Interface) *Proxier { localhostNodePorts: true, nodePortAddresses: proxyutil.NewNodePortAddresses(ipfamily, nil), networkInterfacer: networkInterfacer, + nfAcctCounters: map[string]bool{ + metrics.IPTablesCTStateInvalidDroppedNFAcctCounter: true, + }, } p.setInitialized(true) p.syncRunner = async.NewBoundedFrequencyRunner("test-sync-runner", p.syncProxyRules, 0, time.Minute, 1) @@ -1717,7 +1720,7 @@ func TestOverallIPTablesRules(t *testing.T) { -A KUBE-EXTERNAL-SERVICES -m comment --comment "ns2/svc2:p80 has no local endpoints" -m tcp -p tcp -d 1.2.3.4 --dport 80 -j DROP -A KUBE-EXTERNAL-SERVICES -m comment --comment "ns2/svc2:p80 has no local endpoints" -m addrtype --dst-type LOCAL -m tcp -p tcp --dport 3001 -j DROP -A KUBE-FIREWALL -m comment --comment "block incoming localnet connections" -d 127.0.0.0/8 ! -s 127.0.0.0/8 -m conntrack ! --ctstate RELATED,ESTABLISHED,DNAT -j DROP - -A KUBE-FORWARD -m conntrack --ctstate INVALID -j DROP + -A KUBE-FORWARD -m conntrack --ctstate INVALID -m nfacct --nfacct-name ct_state_invalid_dropped_pkts -j DROP -A KUBE-FORWARD -m comment --comment "kubernetes forwarding rules" -m mark --mark 0x4000/0x4000 -j ACCEPT -A KUBE-FORWARD -m comment --comment "kubernetes forwarding conntrack rule" -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT -A KUBE-PROXY-FIREWALL -m comment --comment "ns5/svc5:p80 traffic not accepted by KUBE-FW-NUKIZ6OKUXPJNT4C" -m tcp -p tcp -d 5.6.7.8 --dport 80 -j DROP @@ -5916,7 +5919,7 @@ func TestSyncProxyRulesRepeated(t *testing.T) { :KUBE-FORWARD - [0:0] :KUBE-PROXY-FIREWALL - [0:0] -A KUBE-FIREWALL -m comment --comment "block incoming localnet connections" -d 127.0.0.0/8 ! -s 127.0.0.0/8 -m conntrack ! --ctstate RELATED,ESTABLISHED,DNAT -j DROP - -A KUBE-FORWARD -m conntrack --ctstate INVALID -j DROP + -A KUBE-FORWARD -m conntrack --ctstate INVALID -m nfacct --nfacct-name ct_state_invalid_dropped_pkts -j DROP -A KUBE-FORWARD -m comment --comment "kubernetes forwarding rules" -m mark --mark 0x4000/0x4000 -j ACCEPT -A KUBE-FORWARD -m comment --comment "kubernetes forwarding conntrack rule" -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT COMMIT @@ -5999,7 +6002,7 @@ func TestSyncProxyRulesRepeated(t *testing.T) { :KUBE-FORWARD - [0:0] :KUBE-PROXY-FIREWALL - [0:0] -A KUBE-FIREWALL -m comment --comment "block incoming localnet connections" -d 127.0.0.0/8 ! -s 127.0.0.0/8 -m conntrack ! --ctstate RELATED,ESTABLISHED,DNAT -j DROP - -A KUBE-FORWARD -m conntrack --ctstate INVALID -j DROP + -A KUBE-FORWARD -m conntrack --ctstate INVALID -m nfacct --nfacct-name ct_state_invalid_dropped_pkts -j DROP -A KUBE-FORWARD -m comment --comment "kubernetes forwarding rules" -m mark --mark 0x4000/0x4000 -j ACCEPT -A KUBE-FORWARD -m comment --comment "kubernetes forwarding conntrack rule" -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT COMMIT @@ -6053,7 +6056,7 @@ func TestSyncProxyRulesRepeated(t *testing.T) { :KUBE-FORWARD - [0:0] :KUBE-PROXY-FIREWALL - [0:0] -A KUBE-FIREWALL -m comment --comment "block incoming localnet connections" -d 127.0.0.0/8 ! -s 127.0.0.0/8 -m conntrack ! --ctstate RELATED,ESTABLISHED,DNAT -j DROP - -A KUBE-FORWARD -m conntrack --ctstate INVALID -j DROP + -A KUBE-FORWARD -m conntrack --ctstate INVALID -m nfacct --nfacct-name ct_state_invalid_dropped_pkts -j DROP -A KUBE-FORWARD -m comment --comment "kubernetes forwarding rules" -m mark --mark 0x4000/0x4000 -j ACCEPT -A KUBE-FORWARD -m comment --comment "kubernetes forwarding conntrack rule" -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT COMMIT @@ -6117,7 +6120,7 @@ func TestSyncProxyRulesRepeated(t *testing.T) { :KUBE-PROXY-FIREWALL - [0:0] -A KUBE-SERVICES -m comment --comment "ns4/svc4:p80 has no endpoints" -m tcp -p tcp -d 172.30.0.44 --dport 80 -j REJECT -A KUBE-FIREWALL -m comment --comment "block incoming localnet connections" -d 127.0.0.0/8 ! -s 127.0.0.0/8 -m conntrack ! --ctstate RELATED,ESTABLISHED,DNAT -j DROP - -A KUBE-FORWARD -m conntrack --ctstate INVALID -j DROP + -A KUBE-FORWARD -m conntrack --ctstate INVALID -m nfacct --nfacct-name ct_state_invalid_dropped_pkts -j DROP -A KUBE-FORWARD -m comment --comment "kubernetes forwarding rules" -m mark --mark 0x4000/0x4000 -j ACCEPT -A KUBE-FORWARD -m comment --comment "kubernetes forwarding conntrack rule" -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT COMMIT @@ -6173,7 +6176,7 @@ func TestSyncProxyRulesRepeated(t *testing.T) { :KUBE-FORWARD - [0:0] :KUBE-PROXY-FIREWALL - [0:0] -A KUBE-FIREWALL -m comment --comment "block incoming localnet connections" -d 127.0.0.0/8 ! -s 127.0.0.0/8 -m conntrack ! --ctstate RELATED,ESTABLISHED,DNAT -j DROP - -A KUBE-FORWARD -m conntrack --ctstate INVALID -j DROP + -A KUBE-FORWARD -m conntrack --ctstate INVALID -m nfacct --nfacct-name ct_state_invalid_dropped_pkts -j DROP -A KUBE-FORWARD -m comment --comment "kubernetes forwarding rules" -m mark --mark 0x4000/0x4000 -j ACCEPT -A KUBE-FORWARD -m comment --comment "kubernetes forwarding conntrack rule" -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT COMMIT @@ -6230,7 +6233,7 @@ func TestSyncProxyRulesRepeated(t *testing.T) { :KUBE-FORWARD - [0:0] :KUBE-PROXY-FIREWALL - [0:0] -A KUBE-FIREWALL -m comment --comment "block incoming localnet connections" -d 127.0.0.0/8 ! -s 127.0.0.0/8 -m conntrack ! --ctstate RELATED,ESTABLISHED,DNAT -j DROP - -A KUBE-FORWARD -m conntrack --ctstate INVALID -j DROP + -A KUBE-FORWARD -m conntrack --ctstate INVALID -m nfacct --nfacct-name ct_state_invalid_dropped_pkts -j DROP -A KUBE-FORWARD -m comment --comment "kubernetes forwarding rules" -m mark --mark 0x4000/0x4000 -j ACCEPT -A KUBE-FORWARD -m comment --comment "kubernetes forwarding conntrack rule" -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT COMMIT @@ -6286,7 +6289,7 @@ func TestSyncProxyRulesRepeated(t *testing.T) { :KUBE-FORWARD - [0:0] :KUBE-PROXY-FIREWALL - [0:0] -A KUBE-FIREWALL -m comment --comment "block incoming localnet connections" -d 127.0.0.0/8 ! -s 127.0.0.0/8 -m conntrack ! --ctstate RELATED,ESTABLISHED,DNAT -j DROP - -A KUBE-FORWARD -m conntrack --ctstate INVALID -j DROP + -A KUBE-FORWARD -m conntrack --ctstate INVALID -m nfacct --nfacct-name ct_state_invalid_dropped_pkts -j DROP -A KUBE-FORWARD -m comment --comment "kubernetes forwarding rules" -m mark --mark 0x4000/0x4000 -j ACCEPT -A KUBE-FORWARD -m comment --comment "kubernetes forwarding conntrack rule" -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT COMMIT @@ -6344,7 +6347,7 @@ func TestSyncProxyRulesRepeated(t *testing.T) { :KUBE-FORWARD - [0:0] :KUBE-PROXY-FIREWALL - [0:0] -A KUBE-FIREWALL -m comment --comment "block incoming localnet connections" -d 127.0.0.0/8 ! -s 127.0.0.0/8 -m conntrack ! --ctstate RELATED,ESTABLISHED,DNAT -j DROP - -A KUBE-FORWARD -m conntrack --ctstate INVALID -j DROP + -A KUBE-FORWARD -m conntrack --ctstate INVALID -m nfacct --nfacct-name ct_state_invalid_dropped_pkts -j DROP -A KUBE-FORWARD -m comment --comment "kubernetes forwarding rules" -m mark --mark 0x4000/0x4000 -j ACCEPT -A KUBE-FORWARD -m comment --comment "kubernetes forwarding conntrack rule" -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT COMMIT @@ -6435,7 +6438,7 @@ func TestSyncProxyRulesRepeated(t *testing.T) { :KUBE-FORWARD - [0:0] :KUBE-PROXY-FIREWALL - [0:0] -A KUBE-FIREWALL -m comment --comment "block incoming localnet connections" -d 127.0.0.0/8 ! -s 127.0.0.0/8 -m conntrack ! --ctstate RELATED,ESTABLISHED,DNAT -j DROP - -A KUBE-FORWARD -m conntrack --ctstate INVALID -j DROP + -A KUBE-FORWARD -m conntrack --ctstate INVALID -m nfacct --nfacct-name ct_state_invalid_dropped_pkts -j DROP -A KUBE-FORWARD -m comment --comment "kubernetes forwarding rules" -m mark --mark 0x4000/0x4000 -j ACCEPT -A KUBE-FORWARD -m comment --comment "kubernetes forwarding conntrack rule" -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT COMMIT From bc8b90b07d73f722dce3b80e799e903bb87c53be Mon Sep 17 00:00:00 2001 From: Daman Arora Date: Tue, 21 May 2024 16:03:36 +0530 Subject: [PATCH 3/5] pkg/proxy: track localhost nodeport accepted packets Signed-off-by: Daman Arora --- pkg/proxy/iptables/number_generated_rules_test.go | 12 ++++++------ pkg/proxy/iptables/proxier.go | 15 ++++++++++++++- pkg/proxy/iptables/proxier_test.go | 5 +++++ pkg/proxy/metrics/metrics.go | 10 ++++++++++ 4 files changed, 35 insertions(+), 7 deletions(-) diff --git a/pkg/proxy/iptables/number_generated_rules_test.go b/pkg/proxy/iptables/number_generated_rules_test.go index 959b3f165cb..dd318d20814 100644 --- a/pkg/proxy/iptables/number_generated_rules_test.go +++ b/pkg/proxy/iptables/number_generated_rules_test.go @@ -162,7 +162,7 @@ func TestNumberIptablesRules(t *testing.T) { services: 1, epPerService: 1, expectedFilterRules: 5, - expectedNatRules: 17, + expectedNatRules: 18, }, { name: "1 Services 2 EndpointPerService - LoadBalancer", @@ -177,7 +177,7 @@ func TestNumberIptablesRules(t *testing.T) { services: 1, epPerService: 2, expectedFilterRules: 5, - expectedNatRules: 20, + expectedNatRules: 21, }, { name: "1 Services 10 EndpointPerService - LoadBalancer", @@ -192,7 +192,7 @@ func TestNumberIptablesRules(t *testing.T) { services: 1, epPerService: 10, expectedFilterRules: 5, - expectedNatRules: 44, + expectedNatRules: 45, }, { name: "10 Services 0 EndpointsPerService - LoadBalancer", @@ -222,7 +222,7 @@ func TestNumberIptablesRules(t *testing.T) { services: 10, epPerService: 1, expectedFilterRules: 14, - expectedNatRules: 125, + expectedNatRules: 135, }, { name: "10 Services 2 EndpointPerService - LoadBalancer", @@ -237,7 +237,7 @@ func TestNumberIptablesRules(t *testing.T) { services: 10, epPerService: 2, expectedFilterRules: 14, - expectedNatRules: 155, + expectedNatRules: 165, }, { name: "10 Services 10 EndpointPerService - LoadBalancer", @@ -252,7 +252,7 @@ func TestNumberIptablesRules(t *testing.T) { services: 10, epPerService: 10, expectedFilterRules: 14, - expectedNatRules: 395, + expectedNatRules: 405, }, } diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index 2f3862f5799..d286c84323b 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -314,7 +314,10 @@ func NewProxier(ctx context.Context, networkInterfacer: proxyutil.RealNetwork{}, conntrackTCPLiberal: conntrackTCPLiberal, logger: logger, - nfAcctCounters: map[string]bool{metrics.IPTablesCTStateInvalidDroppedNFAcctCounter: false}, + nfAcctCounters: map[string]bool{ + metrics.IPTablesCTStateInvalidDroppedNFAcctCounter: false, + metrics.LocalhostNodePortAcceptedNFAcctCounter: false, + }, } burstSyncs := 2 @@ -1183,6 +1186,16 @@ func (proxier *Proxier) syncProxyRules() { // Jump to the external destination chain. For better or for // worse, nodeports are not subect to loadBalancerSourceRanges, // and we can't change that. + if proxier.localhostNodePorts && proxier.ipFamily == v1.IPv4Protocol && proxier.nfAcctCounters[metrics.LocalhostNodePortAcceptedNFAcctCounter] { + natRules.Write( + "-A", string(kubeNodePortsChain), + "-m", "comment", "--comment", svcPortNameString, + "-m", protocol, "-p", protocol, + "-d", "127.0.0.0/8", + "--dport", strconv.Itoa(svcInfo.NodePort()), + "-m", "nfacct", "--nfacct-name", metrics.LocalhostNodePortAcceptedNFAcctCounter, + "-j", string(externalTrafficChain)) + } natRules.Write( "-A", string(kubeNodePortsChain), "-m", "comment", "--comment", svcPortNameString, diff --git a/pkg/proxy/iptables/proxier_test.go b/pkg/proxy/iptables/proxier_test.go index 2f3ac06b857..8323922fb1d 100644 --- a/pkg/proxy/iptables/proxier_test.go +++ b/pkg/proxy/iptables/proxier_test.go @@ -113,6 +113,7 @@ func NewFakeProxier(ipt utiliptables.Interface) *Proxier { networkInterfacer.AddInterfaceAddr(&itf1, addrs1) p := &Proxier{ + ipFamily: ipfamily, svcPortMap: make(proxy.ServicePortMap), serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, ipfamily, nil, nil), endpointsMap: make(proxy.EndpointsMap), @@ -137,6 +138,7 @@ func NewFakeProxier(ipt utiliptables.Interface) *Proxier { networkInterfacer: networkInterfacer, nfAcctCounters: map[string]bool{ metrics.IPTablesCTStateInvalidDroppedNFAcctCounter: true, + metrics.LocalhostNodePortAcceptedNFAcctCounter: true, }, } p.setInitialized(true) @@ -1746,8 +1748,11 @@ func TestOverallIPTablesRules(t *testing.T) { :KUBE-SVC-NUKIZ6OKUXPJNT4C - [0:0] :KUBE-SVC-X27LE4BHSL4DOUIK - [0:0] :KUBE-SVC-XPGD46QRK7WJZT7O - [0:0] + -A KUBE-NODEPORTS -m comment --comment ns2/svc2:p80 -m tcp -p tcp -d 127.0.0.0/8 --dport 3001 -m nfacct --nfacct-name localhost_nps_accepted_pkts -j KUBE-EXT-GNZBNJ2PO5MGZ6GT -A KUBE-NODEPORTS -m comment --comment ns2/svc2:p80 -m tcp -p tcp --dport 3001 -j KUBE-EXT-GNZBNJ2PO5MGZ6GT + -A KUBE-NODEPORTS -m comment --comment ns3/svc3:p80 -m tcp -p tcp -d 127.0.0.0/8 --dport 3003 -m nfacct --nfacct-name localhost_nps_accepted_pkts -j KUBE-EXT-X27LE4BHSL4DOUIK -A KUBE-NODEPORTS -m comment --comment ns3/svc3:p80 -m tcp -p tcp --dport 3003 -j KUBE-EXT-X27LE4BHSL4DOUIK + -A KUBE-NODEPORTS -m comment --comment ns5/svc5:p80 -m tcp -p tcp -d 127.0.0.0/8 --dport 3002 -m nfacct --nfacct-name localhost_nps_accepted_pkts -j KUBE-EXT-NUKIZ6OKUXPJNT4C -A KUBE-NODEPORTS -m comment --comment ns5/svc5:p80 -m tcp -p tcp --dport 3002 -j KUBE-EXT-NUKIZ6OKUXPJNT4C -A KUBE-SERVICES -m comment --comment "ns1/svc1:p80 cluster IP" -m tcp -p tcp -d 172.30.0.41 --dport 80 -j KUBE-SVC-XPGD46QRK7WJZT7O -A KUBE-SERVICES -m comment --comment "ns2/svc2:p80 cluster IP" -m tcp -p tcp -d 172.30.0.42 --dport 80 -j KUBE-SVC-GNZBNJ2PO5MGZ6GT diff --git a/pkg/proxy/metrics/metrics.go b/pkg/proxy/metrics/metrics.go index 41377b666a4..15682f25eaa 100644 --- a/pkg/proxy/metrics/metrics.go +++ b/pkg/proxy/metrics/metrics.go @@ -266,6 +266,15 @@ var ( }, []string{"traffic_policy"}, ) + + // localhostNodePortsAcceptedPacketsDescription describe the metrics for the number of packets accepted + // by iptables which were destined for nodeports on loopback interface. + localhostNodePortsAcceptedPacketsDescription = metrics.NewDesc( + "kubeproxy_iptables_localhost_nodeports_accepted_packets_total", + "Number of packets accepted on nodeports of loopback interface", + nil, nil, metrics.ALPHA, "") + LocalhostNodePortAcceptedNFAcctCounter = "localhost_nps_accepted_pkts" + localhostNodePortsAcceptedMetricsCollector = newNFAcctMetricCollector(LocalhostNodePortAcceptedNFAcctCounter, localhostNodePortsAcceptedPacketsDescription) ) var registerMetricsOnce sync.Once @@ -291,6 +300,7 @@ func RegisterMetrics(mode kubeproxyconfig.ProxyMode) { switch mode { case kubeproxyconfig.ProxyModeIPTables: legacyregistry.CustomMustRegister(iptablesCTStateInvalidDroppedMetricCollector) + legacyregistry.CustomMustRegister(localhostNodePortsAcceptedMetricsCollector) legacyregistry.MustRegister(SyncFullProxyRulesLatency) legacyregistry.MustRegister(SyncPartialProxyRulesLatency) legacyregistry.MustRegister(IPTablesRestoreFailuresTotal) From 9b9a3d780e627859c328efa49fd0205bf3a7f334 Mon Sep 17 00:00:00 2001 From: Daman Arora Date: Tue, 21 May 2024 22:41:23 +0530 Subject: [PATCH 4/5] test/e2e: add kube-proxy metrics grabber Signed-off-by: Daman Arora --- .../framework/metrics/kube_proxy_metrics.go | 42 +++++++++++++++++ test/e2e/framework/metrics/metrics_grabber.go | 45 ++++++++++++++++++- 2 files changed, 86 insertions(+), 1 deletion(-) create mode 100644 test/e2e/framework/metrics/kube_proxy_metrics.go diff --git a/test/e2e/framework/metrics/kube_proxy_metrics.go b/test/e2e/framework/metrics/kube_proxy_metrics.go new file mode 100644 index 00000000000..66cdada9d62 --- /dev/null +++ b/test/e2e/framework/metrics/kube_proxy_metrics.go @@ -0,0 +1,42 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package metrics + +import ( + "k8s.io/component-base/metrics/testutil" +) + +// KubeProxyMetrics is metrics for kube-proxy +type KubeProxyMetrics testutil.Metrics + +// GetCounterMetricValue returns value for metric type counter. +func (m *KubeProxyMetrics) GetCounterMetricValue(metricName string) float64 { + return float64(testutil.Metrics(*m)[metricName][0].Value) +} + +func newKubeProxyMetricsMetrics() KubeProxyMetrics { + result := testutil.NewMetrics() + return KubeProxyMetrics(result) +} + +func parseKubeProxyMetrics(data string) (KubeProxyMetrics, error) { + result := newKubeProxyMetricsMetrics() + if err := testutil.ParseMetrics(data, (*testutil.Metrics)(&result)); err != nil { + return KubeProxyMetrics{}, err + } + return result, nil +} diff --git a/test/e2e/framework/metrics/metrics_grabber.go b/test/e2e/framework/metrics/metrics_grabber.go index 76864526887..9919cccd9df 100644 --- a/test/e2e/framework/metrics/metrics_grabber.go +++ b/test/e2e/framework/metrics/metrics_grabber.go @@ -22,6 +22,7 @@ import ( "fmt" "net" "regexp" + "strconv" "sync" "time" @@ -32,8 +33,9 @@ import ( clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/klog/v2" - + "k8s.io/kubernetes/test/e2e/framework" e2epod "k8s.io/kubernetes/test/e2e/framework/pod" + e2epodoutput "k8s.io/kubernetes/test/e2e/framework/pod/output" ) const ( @@ -43,6 +45,8 @@ const ( kubeControllerManagerPort = 10257 // snapshotControllerPort is the port for the snapshot controller snapshotControllerPort = 9102 + // kubeProxyPort is the default port for the kube-proxy status server. + kubeProxyPort = 10249 ) // MetricsGrabbingDisabledError is an error that is wrapped by the @@ -233,6 +237,45 @@ func (g *Grabber) getMetricsFromNode(ctx context.Context, nodeName string, kubel } } +// GrabFromKubeProxy returns metrics from kube-proxy +func (g *Grabber) GrabFromKubeProxy(ctx context.Context, nodeName string) (KubeProxyMetrics, error) { + nodes, err := g.client.CoreV1().Nodes().List(ctx, metav1.ListOptions{FieldSelector: fields.Set{"metadata.name": nodeName}.AsSelector().String()}) + if err != nil { + return KubeProxyMetrics{}, err + } + + if len(nodes.Items) != 1 { + return KubeProxyMetrics{}, fmt.Errorf("error listing nodes with name %v, got %v", nodeName, nodes.Items) + } + output, err := g.grabFromKubeProxy(ctx, nodeName) + if err != nil { + return KubeProxyMetrics{}, err + } + return parseKubeProxyMetrics(output) +} + +func (g *Grabber) grabFromKubeProxy(ctx context.Context, nodeName string) (string, error) { + hostCmdPodName := fmt.Sprintf("grab-kube-proxy-metrics-%s", framework.RandomSuffix()) + hostCmdPod := e2epod.NewExecPodSpec(metav1.NamespaceSystem, hostCmdPodName, true) + nodeSelection := e2epod.NodeSelection{Name: nodeName} + e2epod.SetNodeSelection(&hostCmdPod.Spec, nodeSelection) + if _, err := g.client.CoreV1().Pods(metav1.NamespaceSystem).Create(ctx, hostCmdPod, metav1.CreateOptions{}); err != nil { + return "", fmt.Errorf("failed to create pod to fetch metrics: %w", err) + } + if err := e2epod.WaitTimeoutForPodReadyInNamespace(ctx, g.client, hostCmdPodName, metav1.NamespaceSystem, 5*time.Minute); err != nil { + return "", fmt.Errorf("error waiting for pod to be up: %w", err) + } + + host := "127.0.0.1" + if framework.TestContext.ClusterIsIPv6() { + host = "::1" + } + + stdout, err := e2epodoutput.RunHostCmd(metav1.NamespaceSystem, hostCmdPodName, fmt.Sprintf("curl --silent %s/metrics", net.JoinHostPort(host, strconv.Itoa(kubeProxyPort)))) + _ = g.client.CoreV1().Pods(metav1.NamespaceSystem).Delete(ctx, hostCmdPodName, metav1.DeleteOptions{}) + return stdout, err +} + // GrabFromScheduler returns metrics from scheduler func (g *Grabber) GrabFromScheduler(ctx context.Context) (SchedulerMetrics, error) { if !g.grabFromScheduler { From 2e669de45858be6a55b59fc04d2c547f2f121888 Mon Sep 17 00:00:00 2001 From: Daman Arora Date: Tue, 21 May 2024 22:46:50 +0530 Subject: [PATCH 5/5] test/e2e/network: test for localhost nodeport metric update Signed-off-by: Daman Arora --- test/e2e/network/kube_proxy.go | 128 +++++++++++++++++++++++++++++++-- 1 file changed, 121 insertions(+), 7 deletions(-) diff --git a/test/e2e/network/kube_proxy.go b/test/e2e/network/kube_proxy.go index 81a177517a2..3435122d942 100644 --- a/test/e2e/network/kube_proxy.go +++ b/test/e2e/network/kube_proxy.go @@ -25,22 +25,25 @@ import ( "strings" "time" + "github.com/onsi/ginkgo/v2" + "github.com/onsi/gomega" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/wait" - + "k8s.io/kubernetes/pkg/cluster/ports" + "k8s.io/kubernetes/pkg/proxy/apis/config" "k8s.io/kubernetes/test/e2e/framework" + e2emetrics "k8s.io/kubernetes/test/e2e/framework/metrics" e2enode "k8s.io/kubernetes/test/e2e/framework/node" e2epod "k8s.io/kubernetes/test/e2e/framework/pod" - e2eoutput "k8s.io/kubernetes/test/e2e/framework/pod/output" + e2epodoutput "k8s.io/kubernetes/test/e2e/framework/pod/output" e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper" "k8s.io/kubernetes/test/e2e/network/common" imageutils "k8s.io/kubernetes/test/utils/image" admissionapi "k8s.io/pod-security-admission/api" netutils "k8s.io/utils/net" - - "github.com/onsi/ginkgo/v2" - "github.com/onsi/gomega" ) var kubeProxyE2eImage = imageutils.GetE2EImage(imageutils.Agnhost) @@ -220,7 +223,7 @@ var _ = common.SIGDescribe("KubeProxy", func() { "| grep -m 1 'CLOSE_WAIT.*dport=%v' ", ipFamily, ip, testDaemonTCPPort) if err := wait.PollImmediate(2*time.Second, epsilonSeconds*time.Second, func() (bool, error) { - result, err := e2eoutput.RunHostCmd(fr.Namespace.Name, "e2e-net-exec", cmd) + result, err := e2epodoutput.RunHostCmd(fr.Namespace.Name, "e2e-net-exec", cmd) // retry if we can't obtain the conntrack entry if err != nil { framework.Logf("failed to obtain conntrack entry: %v %v", result, err) @@ -242,7 +245,7 @@ var _ = common.SIGDescribe("KubeProxy", func() { return false, fmt.Errorf("wrong TCP CLOSE_WAIT timeout: %v expected: %v", timeoutSeconds, expectedTimeoutSeconds) }); err != nil { // Dump all conntrack entries for debugging - result, err2 := e2eoutput.RunHostCmd(fr.Namespace.Name, "e2e-net-exec", "conntrack -L") + result, err2 := e2epodoutput.RunHostCmd(fr.Namespace.Name, "e2e-net-exec", "conntrack -L") if err2 != nil { framework.Logf("failed to obtain conntrack entry: %v %v", result, err2) } @@ -251,4 +254,115 @@ var _ = common.SIGDescribe("KubeProxy", func() { } }) + ginkgo.It("should update metric for tracking accepted packets destined for localhost nodeports", func(ctx context.Context) { + if framework.TestContext.ClusterIsIPv6() { + e2eskipper.Skipf("test requires IPv4 cluster") + } + + cs := fr.ClientSet + ns := fr.Namespace.Name + + nodes, err := e2enode.GetBoundedReadySchedulableNodes(ctx, cs, 1) + framework.ExpectNoError(err) + if len(nodes.Items) < 1 { + e2eskipper.Skipf( + "Test requires >= 1 Ready nodes, but there are only %v nodes", + len(nodes.Items)) + } + nodeName := nodes.Items[0].Name + + metricName := "kubeproxy_iptables_localhost_nodeports_accepted_packets_total" + metricsGrabber, err := e2emetrics.NewMetricsGrabber(ctx, fr.ClientSet, nil, fr.ClientConfig(), false, false, false, false, false, false) + framework.ExpectNoError(err) + + // create a pod with host-network for execing + hostExecPodName := "host-exec-pod" + hostExecPod := e2epod.NewExecPodSpec(fr.Namespace.Name, hostExecPodName, true) + nodeSelection := e2epod.NodeSelection{Name: nodeName} + e2epod.SetNodeSelection(&hostExecPod.Spec, nodeSelection) + e2epod.NewPodClient(fr).CreateSync(ctx, hostExecPod) + + // get proxyMode + stdout, err := e2epodoutput.RunHostCmd(fr.Namespace.Name, hostExecPodName, fmt.Sprintf("curl --silent 127.0.0.1:%d/proxyMode", ports.ProxyStatusPort)) + framework.ExpectNoError(err) + proxyMode := strings.TrimSpace(stdout) + + // get value of route_localnet + stdout, err = e2epodoutput.RunHostCmd(fr.Namespace.Name, hostExecPodName, "cat /proc/sys/net/ipv4/conf/all/route_localnet") + framework.ExpectNoError(err) + routeLocalnet := strings.TrimSpace(stdout) + + if !(proxyMode == string(config.ProxyModeIPTables) && routeLocalnet == "1") { + e2eskipper.Skipf("test requires iptables proxy mode with route_localnet set") + } + + // get value of target metric before accessing localhost nodeports + metrics, err := metricsGrabber.GrabFromKubeProxy(ctx, nodeName) + framework.ExpectNoError(err) + targetMetricBefore := metrics.GetCounterMetricValue(metricName) + + // create pod + ginkgo.By("creating test pod") + label := map[string]string{ + "app": "agnhost-localhost-nodeports", + } + httpPort := []v1.ContainerPort{ + { + ContainerPort: 8080, + Protocol: v1.ProtocolTCP, + }, + } + pod := e2epod.NewAgnhostPod(ns, "agnhost-localhost-nodeports", nil, nil, httpPort, "netexec") + pod.Labels = label + e2epod.NewPodClient(fr).CreateSync(ctx, pod) + + // create nodeport service + ginkgo.By("creating test nodeport service") + svc := &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "agnhost-localhost-nodeports", + }, + Spec: v1.ServiceSpec{ + Type: v1.ServiceTypeNodePort, + Selector: label, + Ports: []v1.ServicePort{ + { + Protocol: v1.ProtocolTCP, + Port: 9000, + TargetPort: intstr.IntOrString{Type: 0, IntVal: 8080}, + }, + }, + }, + } + svc, err = fr.ClientSet.CoreV1().Services(fr.Namespace.Name).Create(ctx, svc, metav1.CreateOptions{}) + framework.ExpectNoError(err) + + // wait for endpoints update + ginkgo.By("waiting for endpoints to be updated") + err = framework.WaitForServiceEndpointsNum(ctx, fr.ClientSet, ns, svc.Name, 1, time.Second, wait.ForeverTestTimeout) + framework.ExpectNoError(err) + + ginkgo.By("accessing endpoint via localhost nodeports 10 times") + for i := 0; i < 10; i++ { + if err := wait.PollUntilContextTimeout(ctx, 1*time.Second, 10*time.Second, true, func(_ context.Context) (bool, error) { + _, err = e2epodoutput.RunHostCmd(fr.Namespace.Name, hostExecPodName, fmt.Sprintf("curl --silent http://localhost:%d/hostname", svc.Spec.Ports[0].NodePort)) + if err != nil { + return false, nil + } + return true, nil + }); err != nil { + e2eskipper.Skipf("skipping test as localhost nodeports are not acceesible in this environment") + } + } + + // our target metric should be updated by now + if err := wait.PollUntilContextTimeout(ctx, 10*time.Second, 2*time.Minute, true, func(_ context.Context) (bool, error) { + metrics, err := metricsGrabber.GrabFromKubeProxy(ctx, nodeName) + framework.ExpectNoError(err) + targetMetricAfter := metrics.GetCounterMetricValue(metricName) + return targetMetricAfter > targetMetricBefore, nil + }); err != nil { + framework.Failf("expected %s metric to be updated after accessing endpoints via localhost nodeports", metricName) + } + }) })