diff --git a/test/e2e/network/kube_proxy.go b/test/e2e/network/kube_proxy.go index 81a177517a2..3435122d942 100644 --- a/test/e2e/network/kube_proxy.go +++ b/test/e2e/network/kube_proxy.go @@ -25,22 +25,25 @@ import ( "strings" "time" + "github.com/onsi/ginkgo/v2" + "github.com/onsi/gomega" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/wait" - + "k8s.io/kubernetes/pkg/cluster/ports" + "k8s.io/kubernetes/pkg/proxy/apis/config" "k8s.io/kubernetes/test/e2e/framework" + e2emetrics "k8s.io/kubernetes/test/e2e/framework/metrics" e2enode "k8s.io/kubernetes/test/e2e/framework/node" e2epod "k8s.io/kubernetes/test/e2e/framework/pod" - e2eoutput "k8s.io/kubernetes/test/e2e/framework/pod/output" + e2epodoutput "k8s.io/kubernetes/test/e2e/framework/pod/output" e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper" "k8s.io/kubernetes/test/e2e/network/common" imageutils "k8s.io/kubernetes/test/utils/image" admissionapi "k8s.io/pod-security-admission/api" netutils "k8s.io/utils/net" - - "github.com/onsi/ginkgo/v2" - "github.com/onsi/gomega" ) var kubeProxyE2eImage = imageutils.GetE2EImage(imageutils.Agnhost) @@ -220,7 +223,7 @@ var _ = common.SIGDescribe("KubeProxy", func() { "| grep -m 1 'CLOSE_WAIT.*dport=%v' ", ipFamily, ip, testDaemonTCPPort) if err := wait.PollImmediate(2*time.Second, epsilonSeconds*time.Second, func() (bool, error) { - result, err := e2eoutput.RunHostCmd(fr.Namespace.Name, "e2e-net-exec", cmd) + result, err := e2epodoutput.RunHostCmd(fr.Namespace.Name, "e2e-net-exec", cmd) // retry if we can't obtain the conntrack entry if err != nil { framework.Logf("failed to obtain conntrack entry: %v %v", result, err) @@ -242,7 +245,7 @@ var _ = common.SIGDescribe("KubeProxy", func() { return false, fmt.Errorf("wrong TCP CLOSE_WAIT timeout: %v expected: %v", timeoutSeconds, expectedTimeoutSeconds) }); err != nil { // Dump all conntrack entries for debugging - result, err2 := e2eoutput.RunHostCmd(fr.Namespace.Name, "e2e-net-exec", "conntrack -L") + result, err2 := e2epodoutput.RunHostCmd(fr.Namespace.Name, "e2e-net-exec", "conntrack -L") if err2 != nil { framework.Logf("failed to obtain conntrack entry: %v %v", result, err2) } @@ -251,4 +254,115 @@ var _ = common.SIGDescribe("KubeProxy", func() { } }) + ginkgo.It("should update metric for tracking accepted packets destined for localhost nodeports", func(ctx context.Context) { + if framework.TestContext.ClusterIsIPv6() { + e2eskipper.Skipf("test requires IPv4 cluster") + } + + cs := fr.ClientSet + ns := fr.Namespace.Name + + nodes, err := e2enode.GetBoundedReadySchedulableNodes(ctx, cs, 1) + framework.ExpectNoError(err) + if len(nodes.Items) < 1 { + e2eskipper.Skipf( + "Test requires >= 1 Ready nodes, but there are only %v nodes", + len(nodes.Items)) + } + nodeName := nodes.Items[0].Name + + metricName := "kubeproxy_iptables_localhost_nodeports_accepted_packets_total" + metricsGrabber, err := e2emetrics.NewMetricsGrabber(ctx, fr.ClientSet, nil, fr.ClientConfig(), false, false, false, false, false, false) + framework.ExpectNoError(err) + + // create a pod with host-network for execing + hostExecPodName := "host-exec-pod" + hostExecPod := e2epod.NewExecPodSpec(fr.Namespace.Name, hostExecPodName, true) + nodeSelection := e2epod.NodeSelection{Name: nodeName} + e2epod.SetNodeSelection(&hostExecPod.Spec, nodeSelection) + e2epod.NewPodClient(fr).CreateSync(ctx, hostExecPod) + + // get proxyMode + stdout, err := e2epodoutput.RunHostCmd(fr.Namespace.Name, hostExecPodName, fmt.Sprintf("curl --silent 127.0.0.1:%d/proxyMode", ports.ProxyStatusPort)) + framework.ExpectNoError(err) + proxyMode := strings.TrimSpace(stdout) + + // get value of route_localnet + stdout, err = e2epodoutput.RunHostCmd(fr.Namespace.Name, hostExecPodName, "cat /proc/sys/net/ipv4/conf/all/route_localnet") + framework.ExpectNoError(err) + routeLocalnet := strings.TrimSpace(stdout) + + if !(proxyMode == string(config.ProxyModeIPTables) && routeLocalnet == "1") { + e2eskipper.Skipf("test requires iptables proxy mode with route_localnet set") + } + + // get value of target metric before accessing localhost nodeports + metrics, err := metricsGrabber.GrabFromKubeProxy(ctx, nodeName) + framework.ExpectNoError(err) + targetMetricBefore := metrics.GetCounterMetricValue(metricName) + + // create pod + ginkgo.By("creating test pod") + label := map[string]string{ + "app": "agnhost-localhost-nodeports", + } + httpPort := []v1.ContainerPort{ + { + ContainerPort: 8080, + Protocol: v1.ProtocolTCP, + }, + } + pod := e2epod.NewAgnhostPod(ns, "agnhost-localhost-nodeports", nil, nil, httpPort, "netexec") + pod.Labels = label + e2epod.NewPodClient(fr).CreateSync(ctx, pod) + + // create nodeport service + ginkgo.By("creating test nodeport service") + svc := &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "agnhost-localhost-nodeports", + }, + Spec: v1.ServiceSpec{ + Type: v1.ServiceTypeNodePort, + Selector: label, + Ports: []v1.ServicePort{ + { + Protocol: v1.ProtocolTCP, + Port: 9000, + TargetPort: intstr.IntOrString{Type: 0, IntVal: 8080}, + }, + }, + }, + } + svc, err = fr.ClientSet.CoreV1().Services(fr.Namespace.Name).Create(ctx, svc, metav1.CreateOptions{}) + framework.ExpectNoError(err) + + // wait for endpoints update + ginkgo.By("waiting for endpoints to be updated") + err = framework.WaitForServiceEndpointsNum(ctx, fr.ClientSet, ns, svc.Name, 1, time.Second, wait.ForeverTestTimeout) + framework.ExpectNoError(err) + + ginkgo.By("accessing endpoint via localhost nodeports 10 times") + for i := 0; i < 10; i++ { + if err := wait.PollUntilContextTimeout(ctx, 1*time.Second, 10*time.Second, true, func(_ context.Context) (bool, error) { + _, err = e2epodoutput.RunHostCmd(fr.Namespace.Name, hostExecPodName, fmt.Sprintf("curl --silent http://localhost:%d/hostname", svc.Spec.Ports[0].NodePort)) + if err != nil { + return false, nil + } + return true, nil + }); err != nil { + e2eskipper.Skipf("skipping test as localhost nodeports are not acceesible in this environment") + } + } + + // our target metric should be updated by now + if err := wait.PollUntilContextTimeout(ctx, 10*time.Second, 2*time.Minute, true, func(_ context.Context) (bool, error) { + metrics, err := metricsGrabber.GrabFromKubeProxy(ctx, nodeName) + framework.ExpectNoError(err) + targetMetricAfter := metrics.GetCounterMetricValue(metricName) + return targetMetricAfter > targetMetricBefore, nil + }); err != nil { + framework.Failf("expected %s metric to be updated after accessing endpoints via localhost nodeports", metricName) + } + }) })