Merge pull request #125015 from aroradaman/proxy-localhost-nodeports-metric

Kube-Proxy: Track packets accepted on localhost nodeports
This commit is contained in:
Kubernetes Prow Robot 2024-06-04 18:49:11 -07:00 committed by GitHub
commit e67f889edc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 276 additions and 39 deletions

View File

@ -162,7 +162,7 @@ func TestNumberIptablesRules(t *testing.T) {
services: 1,
epPerService: 1,
expectedFilterRules: 5,
expectedNatRules: 17,
expectedNatRules: 18,
},
{
name: "1 Services 2 EndpointPerService - LoadBalancer",
@ -177,7 +177,7 @@ func TestNumberIptablesRules(t *testing.T) {
services: 1,
epPerService: 2,
expectedFilterRules: 5,
expectedNatRules: 20,
expectedNatRules: 21,
},
{
name: "1 Services 10 EndpointPerService - LoadBalancer",
@ -192,7 +192,7 @@ func TestNumberIptablesRules(t *testing.T) {
services: 1,
epPerService: 10,
expectedFilterRules: 5,
expectedNatRules: 44,
expectedNatRules: 45,
},
{
name: "10 Services 0 EndpointsPerService - LoadBalancer",
@ -222,7 +222,7 @@ func TestNumberIptablesRules(t *testing.T) {
services: 10,
epPerService: 1,
expectedFilterRules: 14,
expectedNatRules: 125,
expectedNatRules: 135,
},
{
name: "10 Services 2 EndpointPerService - LoadBalancer",
@ -237,7 +237,7 @@ func TestNumberIptablesRules(t *testing.T) {
services: 10,
epPerService: 2,
expectedFilterRules: 14,
expectedNatRules: 155,
expectedNatRules: 165,
},
{
name: "10 Services 10 EndpointPerService - LoadBalancer",
@ -252,7 +252,7 @@ func TestNumberIptablesRules(t *testing.T) {
services: 10,
epPerService: 10,
expectedFilterRules: 14,
expectedNatRules: 395,
expectedNatRules: 405,
},
}

View File

@ -314,7 +314,10 @@ func NewProxier(ctx context.Context,
networkInterfacer: proxyutil.RealNetwork{},
conntrackTCPLiberal: conntrackTCPLiberal,
logger: logger,
nfAcctCounters: map[string]bool{metrics.IPTablesCTStateInvalidDroppedNFAcctCounter: false},
nfAcctCounters: map[string]bool{
metrics.IPTablesCTStateInvalidDroppedNFAcctCounter: false,
metrics.LocalhostNodePortAcceptedNFAcctCounter: false,
},
}
burstSyncs := 2
@ -1183,6 +1186,16 @@ func (proxier *Proxier) syncProxyRules() {
// Jump to the external destination chain. For better or for
// worse, nodeports are not subect to loadBalancerSourceRanges,
// and we can't change that.
if proxier.localhostNodePorts && proxier.ipFamily == v1.IPv4Protocol && proxier.nfAcctCounters[metrics.LocalhostNodePortAcceptedNFAcctCounter] {
natRules.Write(
"-A", string(kubeNodePortsChain),
"-m", "comment", "--comment", svcPortNameString,
"-m", protocol, "-p", protocol,
"-d", "127.0.0.0/8",
"--dport", strconv.Itoa(svcInfo.NodePort()),
"-m", "nfacct", "--nfacct-name", metrics.LocalhostNodePortAcceptedNFAcctCounter,
"-j", string(externalTrafficChain))
}
natRules.Write(
"-A", string(kubeNodePortsChain),
"-m", "comment", "--comment", svcPortNameString,

View File

@ -113,6 +113,7 @@ func NewFakeProxier(ipt utiliptables.Interface) *Proxier {
networkInterfacer.AddInterfaceAddr(&itf1, addrs1)
p := &Proxier{
ipFamily: ipfamily,
svcPortMap: make(proxy.ServicePortMap),
serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, ipfamily, nil, nil),
endpointsMap: make(proxy.EndpointsMap),
@ -135,6 +136,10 @@ func NewFakeProxier(ipt utiliptables.Interface) *Proxier {
localhostNodePorts: true,
nodePortAddresses: proxyutil.NewNodePortAddresses(ipfamily, nil),
networkInterfacer: networkInterfacer,
nfAcctCounters: map[string]bool{
metrics.IPTablesCTStateInvalidDroppedNFAcctCounter: true,
metrics.LocalhostNodePortAcceptedNFAcctCounter: true,
},
}
p.setInitialized(true)
p.syncRunner = async.NewBoundedFrequencyRunner("test-sync-runner", p.syncProxyRules, 0, time.Minute, 1)
@ -1717,7 +1722,7 @@ func TestOverallIPTablesRules(t *testing.T) {
-A KUBE-EXTERNAL-SERVICES -m comment --comment "ns2/svc2:p80 has no local endpoints" -m tcp -p tcp -d 1.2.3.4 --dport 80 -j DROP
-A KUBE-EXTERNAL-SERVICES -m comment --comment "ns2/svc2:p80 has no local endpoints" -m addrtype --dst-type LOCAL -m tcp -p tcp --dport 3001 -j DROP
-A KUBE-FIREWALL -m comment --comment "block incoming localnet connections" -d 127.0.0.0/8 ! -s 127.0.0.0/8 -m conntrack ! --ctstate RELATED,ESTABLISHED,DNAT -j DROP
-A KUBE-FORWARD -m conntrack --ctstate INVALID -j DROP
-A KUBE-FORWARD -m conntrack --ctstate INVALID -m nfacct --nfacct-name ct_state_invalid_dropped_pkts -j DROP
-A KUBE-FORWARD -m comment --comment "kubernetes forwarding rules" -m mark --mark 0x4000/0x4000 -j ACCEPT
-A KUBE-FORWARD -m comment --comment "kubernetes forwarding conntrack rule" -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT
-A KUBE-PROXY-FIREWALL -m comment --comment "ns5/svc5:p80 traffic not accepted by KUBE-FW-NUKIZ6OKUXPJNT4C" -m tcp -p tcp -d 5.6.7.8 --dport 80 -j DROP
@ -1743,8 +1748,11 @@ func TestOverallIPTablesRules(t *testing.T) {
:KUBE-SVC-NUKIZ6OKUXPJNT4C - [0:0]
:KUBE-SVC-X27LE4BHSL4DOUIK - [0:0]
:KUBE-SVC-XPGD46QRK7WJZT7O - [0:0]
-A KUBE-NODEPORTS -m comment --comment ns2/svc2:p80 -m tcp -p tcp -d 127.0.0.0/8 --dport 3001 -m nfacct --nfacct-name localhost_nps_accepted_pkts -j KUBE-EXT-GNZBNJ2PO5MGZ6GT
-A KUBE-NODEPORTS -m comment --comment ns2/svc2:p80 -m tcp -p tcp --dport 3001 -j KUBE-EXT-GNZBNJ2PO5MGZ6GT
-A KUBE-NODEPORTS -m comment --comment ns3/svc3:p80 -m tcp -p tcp -d 127.0.0.0/8 --dport 3003 -m nfacct --nfacct-name localhost_nps_accepted_pkts -j KUBE-EXT-X27LE4BHSL4DOUIK
-A KUBE-NODEPORTS -m comment --comment ns3/svc3:p80 -m tcp -p tcp --dport 3003 -j KUBE-EXT-X27LE4BHSL4DOUIK
-A KUBE-NODEPORTS -m comment --comment ns5/svc5:p80 -m tcp -p tcp -d 127.0.0.0/8 --dport 3002 -m nfacct --nfacct-name localhost_nps_accepted_pkts -j KUBE-EXT-NUKIZ6OKUXPJNT4C
-A KUBE-NODEPORTS -m comment --comment ns5/svc5:p80 -m tcp -p tcp --dport 3002 -j KUBE-EXT-NUKIZ6OKUXPJNT4C
-A KUBE-SERVICES -m comment --comment "ns1/svc1:p80 cluster IP" -m tcp -p tcp -d 172.30.0.41 --dport 80 -j KUBE-SVC-XPGD46QRK7WJZT7O
-A KUBE-SERVICES -m comment --comment "ns2/svc2:p80 cluster IP" -m tcp -p tcp -d 172.30.0.42 --dport 80 -j KUBE-SVC-GNZBNJ2PO5MGZ6GT
@ -5916,7 +5924,7 @@ func TestSyncProxyRulesRepeated(t *testing.T) {
:KUBE-FORWARD - [0:0]
:KUBE-PROXY-FIREWALL - [0:0]
-A KUBE-FIREWALL -m comment --comment "block incoming localnet connections" -d 127.0.0.0/8 ! -s 127.0.0.0/8 -m conntrack ! --ctstate RELATED,ESTABLISHED,DNAT -j DROP
-A KUBE-FORWARD -m conntrack --ctstate INVALID -j DROP
-A KUBE-FORWARD -m conntrack --ctstate INVALID -m nfacct --nfacct-name ct_state_invalid_dropped_pkts -j DROP
-A KUBE-FORWARD -m comment --comment "kubernetes forwarding rules" -m mark --mark 0x4000/0x4000 -j ACCEPT
-A KUBE-FORWARD -m comment --comment "kubernetes forwarding conntrack rule" -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT
COMMIT
@ -5999,7 +6007,7 @@ func TestSyncProxyRulesRepeated(t *testing.T) {
:KUBE-FORWARD - [0:0]
:KUBE-PROXY-FIREWALL - [0:0]
-A KUBE-FIREWALL -m comment --comment "block incoming localnet connections" -d 127.0.0.0/8 ! -s 127.0.0.0/8 -m conntrack ! --ctstate RELATED,ESTABLISHED,DNAT -j DROP
-A KUBE-FORWARD -m conntrack --ctstate INVALID -j DROP
-A KUBE-FORWARD -m conntrack --ctstate INVALID -m nfacct --nfacct-name ct_state_invalid_dropped_pkts -j DROP
-A KUBE-FORWARD -m comment --comment "kubernetes forwarding rules" -m mark --mark 0x4000/0x4000 -j ACCEPT
-A KUBE-FORWARD -m comment --comment "kubernetes forwarding conntrack rule" -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT
COMMIT
@ -6053,7 +6061,7 @@ func TestSyncProxyRulesRepeated(t *testing.T) {
:KUBE-FORWARD - [0:0]
:KUBE-PROXY-FIREWALL - [0:0]
-A KUBE-FIREWALL -m comment --comment "block incoming localnet connections" -d 127.0.0.0/8 ! -s 127.0.0.0/8 -m conntrack ! --ctstate RELATED,ESTABLISHED,DNAT -j DROP
-A KUBE-FORWARD -m conntrack --ctstate INVALID -j DROP
-A KUBE-FORWARD -m conntrack --ctstate INVALID -m nfacct --nfacct-name ct_state_invalid_dropped_pkts -j DROP
-A KUBE-FORWARD -m comment --comment "kubernetes forwarding rules" -m mark --mark 0x4000/0x4000 -j ACCEPT
-A KUBE-FORWARD -m comment --comment "kubernetes forwarding conntrack rule" -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT
COMMIT
@ -6117,7 +6125,7 @@ func TestSyncProxyRulesRepeated(t *testing.T) {
:KUBE-PROXY-FIREWALL - [0:0]
-A KUBE-SERVICES -m comment --comment "ns4/svc4:p80 has no endpoints" -m tcp -p tcp -d 172.30.0.44 --dport 80 -j REJECT
-A KUBE-FIREWALL -m comment --comment "block incoming localnet connections" -d 127.0.0.0/8 ! -s 127.0.0.0/8 -m conntrack ! --ctstate RELATED,ESTABLISHED,DNAT -j DROP
-A KUBE-FORWARD -m conntrack --ctstate INVALID -j DROP
-A KUBE-FORWARD -m conntrack --ctstate INVALID -m nfacct --nfacct-name ct_state_invalid_dropped_pkts -j DROP
-A KUBE-FORWARD -m comment --comment "kubernetes forwarding rules" -m mark --mark 0x4000/0x4000 -j ACCEPT
-A KUBE-FORWARD -m comment --comment "kubernetes forwarding conntrack rule" -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT
COMMIT
@ -6173,7 +6181,7 @@ func TestSyncProxyRulesRepeated(t *testing.T) {
:KUBE-FORWARD - [0:0]
:KUBE-PROXY-FIREWALL - [0:0]
-A KUBE-FIREWALL -m comment --comment "block incoming localnet connections" -d 127.0.0.0/8 ! -s 127.0.0.0/8 -m conntrack ! --ctstate RELATED,ESTABLISHED,DNAT -j DROP
-A KUBE-FORWARD -m conntrack --ctstate INVALID -j DROP
-A KUBE-FORWARD -m conntrack --ctstate INVALID -m nfacct --nfacct-name ct_state_invalid_dropped_pkts -j DROP
-A KUBE-FORWARD -m comment --comment "kubernetes forwarding rules" -m mark --mark 0x4000/0x4000 -j ACCEPT
-A KUBE-FORWARD -m comment --comment "kubernetes forwarding conntrack rule" -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT
COMMIT
@ -6230,7 +6238,7 @@ func TestSyncProxyRulesRepeated(t *testing.T) {
:KUBE-FORWARD - [0:0]
:KUBE-PROXY-FIREWALL - [0:0]
-A KUBE-FIREWALL -m comment --comment "block incoming localnet connections" -d 127.0.0.0/8 ! -s 127.0.0.0/8 -m conntrack ! --ctstate RELATED,ESTABLISHED,DNAT -j DROP
-A KUBE-FORWARD -m conntrack --ctstate INVALID -j DROP
-A KUBE-FORWARD -m conntrack --ctstate INVALID -m nfacct --nfacct-name ct_state_invalid_dropped_pkts -j DROP
-A KUBE-FORWARD -m comment --comment "kubernetes forwarding rules" -m mark --mark 0x4000/0x4000 -j ACCEPT
-A KUBE-FORWARD -m comment --comment "kubernetes forwarding conntrack rule" -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT
COMMIT
@ -6286,7 +6294,7 @@ func TestSyncProxyRulesRepeated(t *testing.T) {
:KUBE-FORWARD - [0:0]
:KUBE-PROXY-FIREWALL - [0:0]
-A KUBE-FIREWALL -m comment --comment "block incoming localnet connections" -d 127.0.0.0/8 ! -s 127.0.0.0/8 -m conntrack ! --ctstate RELATED,ESTABLISHED,DNAT -j DROP
-A KUBE-FORWARD -m conntrack --ctstate INVALID -j DROP
-A KUBE-FORWARD -m conntrack --ctstate INVALID -m nfacct --nfacct-name ct_state_invalid_dropped_pkts -j DROP
-A KUBE-FORWARD -m comment --comment "kubernetes forwarding rules" -m mark --mark 0x4000/0x4000 -j ACCEPT
-A KUBE-FORWARD -m comment --comment "kubernetes forwarding conntrack rule" -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT
COMMIT
@ -6344,7 +6352,7 @@ func TestSyncProxyRulesRepeated(t *testing.T) {
:KUBE-FORWARD - [0:0]
:KUBE-PROXY-FIREWALL - [0:0]
-A KUBE-FIREWALL -m comment --comment "block incoming localnet connections" -d 127.0.0.0/8 ! -s 127.0.0.0/8 -m conntrack ! --ctstate RELATED,ESTABLISHED,DNAT -j DROP
-A KUBE-FORWARD -m conntrack --ctstate INVALID -j DROP
-A KUBE-FORWARD -m conntrack --ctstate INVALID -m nfacct --nfacct-name ct_state_invalid_dropped_pkts -j DROP
-A KUBE-FORWARD -m comment --comment "kubernetes forwarding rules" -m mark --mark 0x4000/0x4000 -j ACCEPT
-A KUBE-FORWARD -m comment --comment "kubernetes forwarding conntrack rule" -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT
COMMIT
@ -6435,7 +6443,7 @@ func TestSyncProxyRulesRepeated(t *testing.T) {
:KUBE-FORWARD - [0:0]
:KUBE-PROXY-FIREWALL - [0:0]
-A KUBE-FIREWALL -m comment --comment "block incoming localnet connections" -d 127.0.0.0/8 ! -s 127.0.0.0/8 -m conntrack ! --ctstate RELATED,ESTABLISHED,DNAT -j DROP
-A KUBE-FORWARD -m conntrack --ctstate INVALID -j DROP
-A KUBE-FORWARD -m conntrack --ctstate INVALID -m nfacct --nfacct-name ct_state_invalid_dropped_pkts -j DROP
-A KUBE-FORWARD -m comment --comment "kubernetes forwarding rules" -m mark --mark 0x4000/0x4000 -j ACCEPT
-A KUBE-FORWARD -m comment --comment "kubernetes forwarding conntrack rule" -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT
COMMIT

View File

@ -147,7 +147,8 @@ var (
"kubeproxy_iptables_ct_state_invalid_dropped_packets_total",
"packets dropped by iptables to work around conntrack problems",
nil, nil, metrics.ALPHA, "")
IPTablesCTStateInvalidDroppedNFAcctCounter = "ct_state_invalid_dropped_pkts"
IPTablesCTStateInvalidDroppedNFAcctCounter = "ct_state_invalid_dropped_pkts"
iptablesCTStateInvalidDroppedMetricCollector = newNFAcctMetricCollector(IPTablesCTStateInvalidDroppedNFAcctCounter, iptablesCTStateInvalidDroppedPacketsDescription)
// IPTablesRestoreFailuresTotal is the number of iptables restore failures that the proxy has
// seen.
@ -265,6 +266,15 @@ var (
},
[]string{"traffic_policy"},
)
// localhostNodePortsAcceptedPacketsDescription describe the metrics for the number of packets accepted
// by iptables which were destined for nodeports on loopback interface.
localhostNodePortsAcceptedPacketsDescription = metrics.NewDesc(
"kubeproxy_iptables_localhost_nodeports_accepted_packets_total",
"Number of packets accepted on nodeports of loopback interface",
nil, nil, metrics.ALPHA, "")
LocalhostNodePortAcceptedNFAcctCounter = "localhost_nps_accepted_pkts"
localhostNodePortsAcceptedMetricsCollector = newNFAcctMetricCollector(LocalhostNodePortAcceptedNFAcctCounter, localhostNodePortsAcceptedPacketsDescription)
)
var registerMetricsOnce sync.Once
@ -289,7 +299,8 @@ func RegisterMetrics(mode kubeproxyconfig.ProxyMode) {
switch mode {
case kubeproxyconfig.ProxyModeIPTables:
legacyregistry.CustomMustRegister(newCTStateInvalidPacketsCollector())
legacyregistry.CustomMustRegister(iptablesCTStateInvalidDroppedMetricCollector)
legacyregistry.CustomMustRegister(localhostNodePortsAcceptedMetricsCollector)
legacyregistry.MustRegister(SyncFullProxyRulesLatency)
legacyregistry.MustRegister(SyncPartialProxyRulesLatency)
legacyregistry.MustRegister(IPTablesRestoreFailuresTotal)
@ -315,34 +326,40 @@ func SinceInSeconds(start time.Time) float64 {
return time.Since(start).Seconds()
}
var _ metrics.StableCollector = &ctStateInvalidPacketsCollector{}
var _ metrics.StableCollector = &nfacctMetricCollector{}
func newCTStateInvalidPacketsCollector() *ctStateInvalidPacketsCollector {
func newNFAcctMetricCollector(counter string, description *metrics.Desc) *nfacctMetricCollector {
client, err := nfacct.New()
if err != nil {
klog.ErrorS(err, "failed to initialize nfacct client")
}
return &ctStateInvalidPacketsCollector{client: client}
return &nfacctMetricCollector{
client: client,
counter: counter,
description: description,
}
}
type ctStateInvalidPacketsCollector struct {
type nfacctMetricCollector struct {
metrics.BaseStableCollector
client nfacct.Interface
client nfacct.Interface
counter string
description *metrics.Desc
}
// DescribeWithStability implements the metrics.StableCollector interface.
func (c *ctStateInvalidPacketsCollector) DescribeWithStability(ch chan<- *metrics.Desc) {
ch <- iptablesCTStateInvalidDroppedPacketsDescription
func (n *nfacctMetricCollector) DescribeWithStability(ch chan<- *metrics.Desc) {
ch <- n.description
}
// CollectWithStability implements the metrics.StableCollector interface.
func (c *ctStateInvalidPacketsCollector) CollectWithStability(ch chan<- metrics.Metric) {
if c.client != nil {
counter, err := c.client.Get(IPTablesCTStateInvalidDroppedNFAcctCounter)
func (n *nfacctMetricCollector) CollectWithStability(ch chan<- metrics.Metric) {
if n.client != nil {
counter, err := n.client.Get(n.counter)
if err != nil {
klog.ErrorS(err, "failed to collect nfacct counter")
klog.ErrorS(err, "failed to collect nfacct counter", "counter", n.counter)
} else {
metric, err := metrics.NewConstMetric(iptablesCTStateInvalidDroppedPacketsDescription, metrics.CounterValue, float64(counter.Packets))
metric, err := metrics.NewConstMetric(n.description, metrics.CounterValue, float64(counter.Packets))
if err != nil {
klog.ErrorS(err, "failed to create constant metric")
} else {

View File

@ -0,0 +1,42 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package metrics
import (
"k8s.io/component-base/metrics/testutil"
)
// KubeProxyMetrics is metrics for kube-proxy
type KubeProxyMetrics testutil.Metrics
// GetCounterMetricValue returns value for metric type counter.
func (m *KubeProxyMetrics) GetCounterMetricValue(metricName string) float64 {
return float64(testutil.Metrics(*m)[metricName][0].Value)
}
func newKubeProxyMetricsMetrics() KubeProxyMetrics {
result := testutil.NewMetrics()
return KubeProxyMetrics(result)
}
func parseKubeProxyMetrics(data string) (KubeProxyMetrics, error) {
result := newKubeProxyMetricsMetrics()
if err := testutil.ParseMetrics(data, (*testutil.Metrics)(&result)); err != nil {
return KubeProxyMetrics{}, err
}
return result, nil
}

View File

@ -22,6 +22,7 @@ import (
"fmt"
"net"
"regexp"
"strconv"
"sync"
"time"
@ -32,8 +33,9 @@ import (
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/klog/v2"
"k8s.io/kubernetes/test/e2e/framework"
e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
e2epodoutput "k8s.io/kubernetes/test/e2e/framework/pod/output"
)
const (
@ -43,6 +45,8 @@ const (
kubeControllerManagerPort = 10257
// snapshotControllerPort is the port for the snapshot controller
snapshotControllerPort = 9102
// kubeProxyPort is the default port for the kube-proxy status server.
kubeProxyPort = 10249
)
// MetricsGrabbingDisabledError is an error that is wrapped by the
@ -233,6 +237,45 @@ func (g *Grabber) getMetricsFromNode(ctx context.Context, nodeName string, kubel
}
}
// GrabFromKubeProxy returns metrics from kube-proxy
func (g *Grabber) GrabFromKubeProxy(ctx context.Context, nodeName string) (KubeProxyMetrics, error) {
nodes, err := g.client.CoreV1().Nodes().List(ctx, metav1.ListOptions{FieldSelector: fields.Set{"metadata.name": nodeName}.AsSelector().String()})
if err != nil {
return KubeProxyMetrics{}, err
}
if len(nodes.Items) != 1 {
return KubeProxyMetrics{}, fmt.Errorf("error listing nodes with name %v, got %v", nodeName, nodes.Items)
}
output, err := g.grabFromKubeProxy(ctx, nodeName)
if err != nil {
return KubeProxyMetrics{}, err
}
return parseKubeProxyMetrics(output)
}
func (g *Grabber) grabFromKubeProxy(ctx context.Context, nodeName string) (string, error) {
hostCmdPodName := fmt.Sprintf("grab-kube-proxy-metrics-%s", framework.RandomSuffix())
hostCmdPod := e2epod.NewExecPodSpec(metav1.NamespaceSystem, hostCmdPodName, true)
nodeSelection := e2epod.NodeSelection{Name: nodeName}
e2epod.SetNodeSelection(&hostCmdPod.Spec, nodeSelection)
if _, err := g.client.CoreV1().Pods(metav1.NamespaceSystem).Create(ctx, hostCmdPod, metav1.CreateOptions{}); err != nil {
return "", fmt.Errorf("failed to create pod to fetch metrics: %w", err)
}
if err := e2epod.WaitTimeoutForPodReadyInNamespace(ctx, g.client, hostCmdPodName, metav1.NamespaceSystem, 5*time.Minute); err != nil {
return "", fmt.Errorf("error waiting for pod to be up: %w", err)
}
host := "127.0.0.1"
if framework.TestContext.ClusterIsIPv6() {
host = "::1"
}
stdout, err := e2epodoutput.RunHostCmd(metav1.NamespaceSystem, hostCmdPodName, fmt.Sprintf("curl --silent %s/metrics", net.JoinHostPort(host, strconv.Itoa(kubeProxyPort))))
_ = g.client.CoreV1().Pods(metav1.NamespaceSystem).Delete(ctx, hostCmdPodName, metav1.DeleteOptions{})
return stdout, err
}
// GrabFromScheduler returns metrics from scheduler
func (g *Grabber) GrabFromScheduler(ctx context.Context) (SchedulerMetrics, error) {
if !g.grabFromScheduler {

View File

@ -25,22 +25,25 @@ import (
"strings"
"time"
"github.com/onsi/ginkgo/v2"
"github.com/onsi/gomega"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/kubernetes/pkg/cluster/ports"
"k8s.io/kubernetes/pkg/proxy/apis/config"
"k8s.io/kubernetes/test/e2e/framework"
e2emetrics "k8s.io/kubernetes/test/e2e/framework/metrics"
e2enode "k8s.io/kubernetes/test/e2e/framework/node"
e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
e2eoutput "k8s.io/kubernetes/test/e2e/framework/pod/output"
e2epodoutput "k8s.io/kubernetes/test/e2e/framework/pod/output"
e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"
"k8s.io/kubernetes/test/e2e/network/common"
imageutils "k8s.io/kubernetes/test/utils/image"
admissionapi "k8s.io/pod-security-admission/api"
netutils "k8s.io/utils/net"
"github.com/onsi/ginkgo/v2"
"github.com/onsi/gomega"
)
var kubeProxyE2eImage = imageutils.GetE2EImage(imageutils.Agnhost)
@ -220,7 +223,7 @@ var _ = common.SIGDescribe("KubeProxy", func() {
"| grep -m 1 'CLOSE_WAIT.*dport=%v' ",
ipFamily, ip, testDaemonTCPPort)
if err := wait.PollImmediate(2*time.Second, epsilonSeconds*time.Second, func() (bool, error) {
result, err := e2eoutput.RunHostCmd(fr.Namespace.Name, "e2e-net-exec", cmd)
result, err := e2epodoutput.RunHostCmd(fr.Namespace.Name, "e2e-net-exec", cmd)
// retry if we can't obtain the conntrack entry
if err != nil {
framework.Logf("failed to obtain conntrack entry: %v %v", result, err)
@ -242,7 +245,7 @@ var _ = common.SIGDescribe("KubeProxy", func() {
return false, fmt.Errorf("wrong TCP CLOSE_WAIT timeout: %v expected: %v", timeoutSeconds, expectedTimeoutSeconds)
}); err != nil {
// Dump all conntrack entries for debugging
result, err2 := e2eoutput.RunHostCmd(fr.Namespace.Name, "e2e-net-exec", "conntrack -L")
result, err2 := e2epodoutput.RunHostCmd(fr.Namespace.Name, "e2e-net-exec", "conntrack -L")
if err2 != nil {
framework.Logf("failed to obtain conntrack entry: %v %v", result, err2)
}
@ -251,4 +254,115 @@ var _ = common.SIGDescribe("KubeProxy", func() {
}
})
ginkgo.It("should update metric for tracking accepted packets destined for localhost nodeports", func(ctx context.Context) {
if framework.TestContext.ClusterIsIPv6() {
e2eskipper.Skipf("test requires IPv4 cluster")
}
cs := fr.ClientSet
ns := fr.Namespace.Name
nodes, err := e2enode.GetBoundedReadySchedulableNodes(ctx, cs, 1)
framework.ExpectNoError(err)
if len(nodes.Items) < 1 {
e2eskipper.Skipf(
"Test requires >= 1 Ready nodes, but there are only %v nodes",
len(nodes.Items))
}
nodeName := nodes.Items[0].Name
metricName := "kubeproxy_iptables_localhost_nodeports_accepted_packets_total"
metricsGrabber, err := e2emetrics.NewMetricsGrabber(ctx, fr.ClientSet, nil, fr.ClientConfig(), false, false, false, false, false, false)
framework.ExpectNoError(err)
// create a pod with host-network for execing
hostExecPodName := "host-exec-pod"
hostExecPod := e2epod.NewExecPodSpec(fr.Namespace.Name, hostExecPodName, true)
nodeSelection := e2epod.NodeSelection{Name: nodeName}
e2epod.SetNodeSelection(&hostExecPod.Spec, nodeSelection)
e2epod.NewPodClient(fr).CreateSync(ctx, hostExecPod)
// get proxyMode
stdout, err := e2epodoutput.RunHostCmd(fr.Namespace.Name, hostExecPodName, fmt.Sprintf("curl --silent 127.0.0.1:%d/proxyMode", ports.ProxyStatusPort))
framework.ExpectNoError(err)
proxyMode := strings.TrimSpace(stdout)
// get value of route_localnet
stdout, err = e2epodoutput.RunHostCmd(fr.Namespace.Name, hostExecPodName, "cat /proc/sys/net/ipv4/conf/all/route_localnet")
framework.ExpectNoError(err)
routeLocalnet := strings.TrimSpace(stdout)
if !(proxyMode == string(config.ProxyModeIPTables) && routeLocalnet == "1") {
e2eskipper.Skipf("test requires iptables proxy mode with route_localnet set")
}
// get value of target metric before accessing localhost nodeports
metrics, err := metricsGrabber.GrabFromKubeProxy(ctx, nodeName)
framework.ExpectNoError(err)
targetMetricBefore := metrics.GetCounterMetricValue(metricName)
// create pod
ginkgo.By("creating test pod")
label := map[string]string{
"app": "agnhost-localhost-nodeports",
}
httpPort := []v1.ContainerPort{
{
ContainerPort: 8080,
Protocol: v1.ProtocolTCP,
},
}
pod := e2epod.NewAgnhostPod(ns, "agnhost-localhost-nodeports", nil, nil, httpPort, "netexec")
pod.Labels = label
e2epod.NewPodClient(fr).CreateSync(ctx, pod)
// create nodeport service
ginkgo.By("creating test nodeport service")
svc := &v1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "agnhost-localhost-nodeports",
},
Spec: v1.ServiceSpec{
Type: v1.ServiceTypeNodePort,
Selector: label,
Ports: []v1.ServicePort{
{
Protocol: v1.ProtocolTCP,
Port: 9000,
TargetPort: intstr.IntOrString{Type: 0, IntVal: 8080},
},
},
},
}
svc, err = fr.ClientSet.CoreV1().Services(fr.Namespace.Name).Create(ctx, svc, metav1.CreateOptions{})
framework.ExpectNoError(err)
// wait for endpoints update
ginkgo.By("waiting for endpoints to be updated")
err = framework.WaitForServiceEndpointsNum(ctx, fr.ClientSet, ns, svc.Name, 1, time.Second, wait.ForeverTestTimeout)
framework.ExpectNoError(err)
ginkgo.By("accessing endpoint via localhost nodeports 10 times")
for i := 0; i < 10; i++ {
if err := wait.PollUntilContextTimeout(ctx, 1*time.Second, 10*time.Second, true, func(_ context.Context) (bool, error) {
_, err = e2epodoutput.RunHostCmd(fr.Namespace.Name, hostExecPodName, fmt.Sprintf("curl --silent http://localhost:%d/hostname", svc.Spec.Ports[0].NodePort))
if err != nil {
return false, nil
}
return true, nil
}); err != nil {
e2eskipper.Skipf("skipping test as localhost nodeports are not acceesible in this environment")
}
}
// our target metric should be updated by now
if err := wait.PollUntilContextTimeout(ctx, 10*time.Second, 2*time.Minute, true, func(_ context.Context) (bool, error) {
metrics, err := metricsGrabber.GrabFromKubeProxy(ctx, nodeName)
framework.ExpectNoError(err)
targetMetricAfter := metrics.GetCounterMetricValue(metricName)
return targetMetricAfter > targetMetricBefore, nil
}); err != nil {
framework.Failf("expected %s metric to be updated after accessing endpoints via localhost nodeports", metricName)
}
})
})