From 5230bab600cb6371e296635191bd78649e1b8e41 Mon Sep 17 00:00:00 2001 From: Dan Winship Date: Wed, 29 May 2024 16:21:34 -0400 Subject: [PATCH 1/3] Convert test/e2e/network/service.go to utils/ptr --- test/e2e/network/service.go | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/test/e2e/network/service.go b/test/e2e/network/service.go index b9e39b87f54..ea5218f8915 100644 --- a/test/e2e/network/service.go +++ b/test/e2e/network/service.go @@ -54,7 +54,7 @@ import ( cloudprovider "k8s.io/cloud-provider" netutils "k8s.io/utils/net" - utilpointer "k8s.io/utils/pointer" + "k8s.io/utils/ptr" podutil "k8s.io/kubernetes/pkg/api/v1/pod" "k8s.io/kubernetes/test/e2e/framework" @@ -1939,7 +1939,7 @@ var _ = common.SIGDescribe("Services", func() { }, } webserverPod0.Labels = jig.Labels - webserverPod0.Spec.TerminationGracePeriodSeconds = utilpointer.Int64(gracePeriod) + webserverPod0.Spec.TerminationGracePeriodSeconds = ptr.To(gracePeriod) e2epod.SetNodeSelection(&webserverPod0.Spec, e2epod.NodeSelection{Name: node0.Name}) _, err = cs.CoreV1().Pods(ns).Create(ctx, webserverPod0, metav1.CreateOptions{}) @@ -2058,7 +2058,7 @@ var _ = common.SIGDescribe("Services", func() { }, } webserverPod0.Labels = jig.Labels - webserverPod0.Spec.TerminationGracePeriodSeconds = utilpointer.Int64(gracePeriod) + webserverPod0.Spec.TerminationGracePeriodSeconds = ptr.To(gracePeriod) e2epod.SetNodeSelection(&webserverPod0.Spec, e2epod.NodeSelection{Name: node0.Name}) _, err = cs.CoreV1().Pods(ns).Create(ctx, webserverPod0, metav1.CreateOptions{}) @@ -2722,7 +2722,7 @@ var _ = common.SIGDescribe("Services", func() { ginkgo.By("Creating 1 webserver pod to be part of the TCP service") webserverPod0 := e2epod.NewAgnhostPod(ns, "echo-hostname-0", nil, nil, nil, "netexec", "--http-port", strconv.Itoa(servicePort), "--delay-shutdown", "100") webserverPod0.Labels = jig.Labels - webserverPod0.Spec.TerminationGracePeriodSeconds = utilpointer.Int64(100) + webserverPod0.Spec.TerminationGracePeriodSeconds = ptr.To[int64](100) e2epod.SetNodeSelection(&webserverPod0.Spec, e2epod.NodeSelection{Name: node0.Name}) _, err = cs.CoreV1().Pods(ns).Create(ctx, webserverPod0, metav1.CreateOptions{}) @@ -2814,7 +2814,7 @@ var _ = common.SIGDescribe("Services", func() { ginkgo.By("Creating 1 webserver pod to be part of the TCP service") webserverPod0 := e2epod.NewAgnhostPod(ns, "echo-hostname-0", nil, nil, nil, "netexec", "--http-port", strconv.Itoa(servicePort), "--delay-shutdown", "100") webserverPod0.Labels = jig.Labels - webserverPod0.Spec.TerminationGracePeriodSeconds = utilpointer.Int64(100) + webserverPod0.Spec.TerminationGracePeriodSeconds = ptr.To[int64](100) e2epod.SetNodeSelection(&webserverPod0.Spec, e2epod.NodeSelection{Name: node0.Name}) _, err = cs.CoreV1().Pods(ns).Create(ctx, webserverPod0, metav1.CreateOptions{}) @@ -2889,7 +2889,7 @@ var _ = common.SIGDescribe("Services", func() { ginkgo.By("Creating 1 webserver pod to be part of the TCP service") webserverPod0 := e2epod.NewAgnhostPod(ns, "echo-hostname-0", nil, nil, nil, "netexec", "--http-port", strconv.Itoa(servicePort), "--delay-shutdown", "100") webserverPod0.Labels = jig.Labels - webserverPod0.Spec.TerminationGracePeriodSeconds = utilpointer.Int64(100) + webserverPod0.Spec.TerminationGracePeriodSeconds = ptr.To[int64](100) e2epod.SetNodeSelection(&webserverPod0.Spec, e2epod.NodeSelection{Name: node0.Name}) _, err = cs.CoreV1().Pods(ns).Create(ctx, webserverPod0, metav1.CreateOptions{}) @@ -2966,7 +2966,7 @@ var _ = common.SIGDescribe("Services", func() { ginkgo.By("Creating 1 webserver pod to be part of the TCP service") webserverPod0 := e2epod.NewAgnhostPod(ns, "echo-hostname-0", nil, nil, nil, "netexec", "--http-port", strconv.Itoa(servicePort), "--delay-shutdown", "100") webserverPod0.Labels = jig.Labels - webserverPod0.Spec.TerminationGracePeriodSeconds = utilpointer.Int64(100) + webserverPod0.Spec.TerminationGracePeriodSeconds = ptr.To[int64](100) e2epod.SetNodeSelection(&webserverPod0.Spec, e2epod.NodeSelection{Name: node0.Name}) _, err = cs.CoreV1().Pods(ns).Create(ctx, webserverPod0, metav1.CreateOptions{}) @@ -3042,7 +3042,7 @@ var _ = common.SIGDescribe("Services", func() { ginkgo.By("Creating 1 webserver pod to be part of the TCP service") webserverPod0 := e2epod.NewAgnhostPod(ns, "echo-hostname-0", nil, nil, nil, "netexec", "--http-port", strconv.Itoa(servicePort), "--delay-shutdown", "100") webserverPod0.Labels = jig.Labels - webserverPod0.Spec.TerminationGracePeriodSeconds = utilpointer.Int64(100) + webserverPod0.Spec.TerminationGracePeriodSeconds = ptr.To[int64](100) e2epod.SetNodeSelection(&webserverPod0.Spec, e2epod.NodeSelection{Name: node0.Name}) _, err = cs.CoreV1().Pods(ns).Create(ctx, webserverPod0, metav1.CreateOptions{}) @@ -3347,7 +3347,7 @@ var _ = common.SIGDescribe("Services", func() { Port: int32(80), TargetPort: intstr.FromInt32(80), }}, - LoadBalancerClass: utilpointer.String("example.com/internal-vip"), + LoadBalancerClass: ptr.To("example.com/internal-vip"), }, } _, err = cs.CoreV1().Services(ns).Create(ctx, &testService, metav1.CreateOptions{}) From 41527afe28dd558e33daca2a2bcfb36c0e0f9740 Mon Sep 17 00:00:00 2001 From: Dan Winship Date: Wed, 8 May 2024 11:23:41 -0400 Subject: [PATCH 2/3] Move eTP:Local NodePort test from loadbalancer.go to service.go (And in particular, remove `[Feature:LoadBalancer]` from it.) --- test/e2e/network/loadbalancer.go | 32 -------------------------------- test/e2e/network/service.go | 32 ++++++++++++++++++++++++++++++++ 2 files changed, 32 insertions(+), 32 deletions(-) diff --git a/test/e2e/network/loadbalancer.go b/test/e2e/network/loadbalancer.go index 6c4d254dd67..4374d681aaf 100644 --- a/test/e2e/network/loadbalancer.go +++ b/test/e2e/network/loadbalancer.go @@ -1014,38 +1014,6 @@ var _ = common.SIGDescribe("LoadBalancers ExternalTrafficPolicy: Local", feature } }) - ginkgo.It("should work for type=NodePort", func(ctx context.Context) { - namespace := f.Namespace.Name - serviceName := "external-local-nodeport" - jig := e2eservice.NewTestJig(cs, namespace, serviceName) - - svc, err := jig.CreateOnlyLocalNodePortService(ctx, true) - framework.ExpectNoError(err) - ginkgo.DeferCleanup(func(ctx context.Context) { - err := cs.CoreV1().Services(svc.Namespace).Delete(ctx, svc.Name, metav1.DeleteOptions{}) - framework.ExpectNoError(err) - }) - - tcpNodePort := int(svc.Spec.Ports[0].NodePort) - - endpointsNodeMap, err := getEndpointNodesWithInternalIP(ctx, jig) - framework.ExpectNoError(err) - - dialCmd := "clientip" - config := e2enetwork.NewNetworkingTestConfig(ctx, f) - - for nodeName, nodeIP := range endpointsNodeMap { - ginkgo.By(fmt.Sprintf("reading clientIP using the TCP service's NodePort, on node %v: %v:%v/%v", nodeName, nodeIP, tcpNodePort, dialCmd)) - clientIP, err := GetHTTPContentFromTestContainer(ctx, config, nodeIP, tcpNodePort, e2eservice.KubeProxyLagTimeout, dialCmd) - framework.ExpectNoError(err) - framework.Logf("ClientIP detected by target pod using NodePort is %s, the ip of test container is %s", clientIP, config.TestContainerPod.Status.PodIP) - // the clientIP returned by agnhost contains port - if !strings.HasPrefix(clientIP, config.TestContainerPod.Status.PodIP) { - framework.Failf("Source IP was NOT preserved") - } - } - }) - ginkgo.It("should only target nodes with endpoints", func(ctx context.Context) { namespace := f.Namespace.Name serviceName := "external-local-nodes" diff --git a/test/e2e/network/service.go b/test/e2e/network/service.go index ea5218f8915..d8db9f890bc 100644 --- a/test/e2e/network/service.go +++ b/test/e2e/network/service.go @@ -2692,6 +2692,38 @@ var _ = common.SIGDescribe("Services", func() { } }) + ginkgo.It("should support externalTrafficPolicy=Local for type=NodePort", func(ctx context.Context) { + namespace := f.Namespace.Name + serviceName := "external-local-nodeport" + jig := e2eservice.NewTestJig(cs, namespace, serviceName) + + svc, err := jig.CreateOnlyLocalNodePortService(ctx, true) + framework.ExpectNoError(err) + ginkgo.DeferCleanup(func(ctx context.Context) { + err := cs.CoreV1().Services(svc.Namespace).Delete(ctx, svc.Name, metav1.DeleteOptions{}) + framework.ExpectNoError(err) + }) + + tcpNodePort := int(svc.Spec.Ports[0].NodePort) + + endpointsNodeMap, err := getEndpointNodesWithInternalIP(ctx, jig) + framework.ExpectNoError(err) + + dialCmd := "clientip" + config := e2enetwork.NewNetworkingTestConfig(ctx, f) + + for nodeName, nodeIP := range endpointsNodeMap { + ginkgo.By(fmt.Sprintf("reading clientIP using the TCP service's NodePort, on node %v: %v:%v/%v", nodeName, nodeIP, tcpNodePort, dialCmd)) + clientIP, err := GetHTTPContentFromTestContainer(ctx, config, nodeIP, tcpNodePort, e2eservice.KubeProxyLagTimeout, dialCmd) + framework.ExpectNoError(err) + framework.Logf("ClientIP detected by target pod using NodePort is %s, the ip of test container is %s", clientIP, config.TestContainerPod.Status.PodIP) + // the clientIP returned by agnhost contains port + if !strings.HasPrefix(clientIP, config.TestContainerPod.Status.PodIP) { + framework.Failf("Source IP was NOT preserved") + } + } + }) + ginkgo.It("should fail health check node port if there are only terminating endpoints", func(ctx context.Context) { // windows kube-proxy does not support this feature yet e2eskipper.SkipIfNodeOSDistroIs("windows") From fff883ab4a6cc87d1512e0098973bf2cac4e34b1 Mon Sep 17 00:00:00 2001 From: Dan Winship Date: Wed, 8 May 2024 12:24:11 -0400 Subject: [PATCH 3/3] Improve eTP:Local NodePort test It previously assumed that pod-to-other-node-nodeIP would be unmasqueraded, but this is not the case for most network plugins. Use a HostNetwork exec pod to avoid problems. This also requires putting the client and endpoint on different nodes, because with most network plugins, a node-to-same-node-pod connection will end up using the internal "docker0" (or whatever) IP as the source address rather than the node's public IP, and we don't know what that IP is. Also make it work with IPv6. --- test/e2e/network/service.go | 82 +++++++++++++++++++++++++++---------- 1 file changed, 61 insertions(+), 21 deletions(-) diff --git a/test/e2e/network/service.go b/test/e2e/network/service.go index d8db9f890bc..f7e7e248d1b 100644 --- a/test/e2e/network/service.go +++ b/test/e2e/network/service.go @@ -2693,34 +2693,74 @@ var _ = common.SIGDescribe("Services", func() { }) ginkgo.It("should support externalTrafficPolicy=Local for type=NodePort", func(ctx context.Context) { + e2eskipper.SkipUnlessNodeCountIsAtLeast(2) + namespace := f.Namespace.Name serviceName := "external-local-nodeport" jig := e2eservice.NewTestJig(cs, namespace, serviceName) - svc, err := jig.CreateOnlyLocalNodePortService(ctx, true) - framework.ExpectNoError(err) - ginkgo.DeferCleanup(func(ctx context.Context) { - err := cs.CoreV1().Services(svc.Namespace).Delete(ctx, svc.Name, metav1.DeleteOptions{}) - framework.ExpectNoError(err) - }) - + ginkgo.By("creating the service") + svc, err := jig.CreateOnlyLocalNodePortService(ctx, false) + framework.ExpectNoError(err, "creating the service") tcpNodePort := int(svc.Spec.Ports[0].NodePort) + nodePortStr := fmt.Sprintf("%d", tcpNodePort) + framework.Logf("NodePort is %s", nodePortStr) - endpointsNodeMap, err := getEndpointNodesWithInternalIP(ctx, jig) - framework.ExpectNoError(err) + ginkgo.By("creating a HostNetwork exec pod") + execPod := launchHostExecPod(ctx, cs, namespace, "hostexec") + execPod, err = cs.CoreV1().Pods(namespace).Get(ctx, execPod.Name, metav1.GetOptions{}) + framework.ExpectNoError(err, "getting podIP of execPod") + framework.Logf("execPod IP is %q", execPod.Status.PodIP) - dialCmd := "clientip" - config := e2enetwork.NewNetworkingTestConfig(ctx, f) - - for nodeName, nodeIP := range endpointsNodeMap { - ginkgo.By(fmt.Sprintf("reading clientIP using the TCP service's NodePort, on node %v: %v:%v/%v", nodeName, nodeIP, tcpNodePort, dialCmd)) - clientIP, err := GetHTTPContentFromTestContainer(ctx, config, nodeIP, tcpNodePort, e2eservice.KubeProxyLagTimeout, dialCmd) - framework.ExpectNoError(err) - framework.Logf("ClientIP detected by target pod using NodePort is %s, the ip of test container is %s", clientIP, config.TestContainerPod.Status.PodIP) - // the clientIP returned by agnhost contains port - if !strings.HasPrefix(clientIP, config.TestContainerPod.Status.PodIP) { - framework.Failf("Source IP was NOT preserved") + ginkgo.By("creating an endpoint for the service on a different node from the execPod") + _, err = jig.Run(ctx, func(rc *v1.ReplicationController) { + rc.Spec.Template.Spec.Affinity = &v1.Affinity{ + // We need to ensure the endpoint is on a different node + // from the exec pod, to ensure that the source IP of the + // traffic is the node's "public" IP. For + // node-to-pod-on-same-node traffic, it might end up using + // the "docker0" IP or something like that. + NodeAffinity: &v1.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{ + NodeSelectorTerms: []v1.NodeSelectorTerm{{ + MatchFields: []v1.NodeSelectorRequirement{{ + Key: "metadata.name", + Operator: "NotIn", + Values: []string{execPod.Spec.NodeName}, + }}, + }}, + }, + }, } + }) + framework.ExpectNoError(err, "creating the endpoint pod") + + // Extract the single endpoint node IP from a map of endpoint node IPs + var endpointNodeIP string + endpointsNodeMap, err := getEndpointNodesWithInternalIP(ctx, jig) + framework.ExpectNoError(err, "fetching endpoint node IPs") + for node, nodeIP := range endpointsNodeMap { + framework.Logf("endpoint is on node %s (%s)", node, nodeIP) + endpointNodeIP = nodeIP + break + } + + ginkgo.By("connecting from the execpod to the NodePort on the endpoint's node") + cmd := fmt.Sprintf("curl -g -q -s --connect-timeout 3 http://%s/clientip", net.JoinHostPort(endpointNodeIP, nodePortStr)) + var clientIP string + err = wait.PollImmediate(framework.Poll, e2eservice.KubeProxyLagTimeout, func() (bool, error) { + clientIPPort, err := e2eoutput.RunHostCmd(execPod.Namespace, execPod.Name, cmd) + if err != nil { + framework.Logf("error connecting: %v", err) + return false, nil + } + clientIP, _, err = net.SplitHostPort(clientIPPort) + framework.ExpectNoError(err, "parsing clientip output") + return true, nil + }) + framework.ExpectNoError(err, "connecting to nodeport service") + if clientIP != execPod.Status.PodIP { + framework.Failf("Source IP %s is not the client IP", clientIP) } }) @@ -2795,7 +2835,7 @@ var _ = common.SIGDescribe("Services", func() { framework.ExpectNoError(err) // validate that the health check node port from kube-proxy returns 503 when there are no ready endpoints - err = wait.PollImmediate(time.Second, time.Minute, func() (bool, error) { + err = wait.PollUntilContextTimeout(ctx, time.Second, time.Minute, true, func(ctx context.Context) (bool, error) { cmd := fmt.Sprintf(`curl -s -o /dev/null -w "%%{http_code}" --max-time 5 http://%s/healthz`, healthCheckNodePortAddr) out, err := e2eoutput.RunHostCmd(pausePod0.Namespace, pausePod0.Name, cmd) if err != nil {