diff --git a/test/e2e/network/loadbalancer.go b/test/e2e/network/loadbalancer.go index 33ace3b8af8..c2fa76bad02 100644 --- a/test/e2e/network/loadbalancer.go +++ b/test/e2e/network/loadbalancer.go @@ -1075,38 +1075,6 @@ var _ = common.SIGDescribe("LoadBalancers ExternalTrafficPolicy: Local", feature } }) - ginkgo.It("should work for type=NodePort", func(ctx context.Context) { - namespace := f.Namespace.Name - serviceName := "external-local-nodeport" - jig := e2eservice.NewTestJig(cs, namespace, serviceName) - - svc, err := jig.CreateOnlyLocalNodePortService(ctx, true) - framework.ExpectNoError(err) - ginkgo.DeferCleanup(func(ctx context.Context) { - err := cs.CoreV1().Services(svc.Namespace).Delete(ctx, svc.Name, metav1.DeleteOptions{}) - framework.ExpectNoError(err) - }) - - tcpNodePort := int(svc.Spec.Ports[0].NodePort) - - endpointsNodeMap, err := getEndpointNodesWithInternalIP(ctx, jig) - framework.ExpectNoError(err) - - dialCmd := "clientip" - config := e2enetwork.NewNetworkingTestConfig(ctx, f) - - for nodeName, nodeIP := range endpointsNodeMap { - ginkgo.By(fmt.Sprintf("reading clientIP using the TCP service's NodePort, on node %v: %v:%v/%v", nodeName, nodeIP, tcpNodePort, dialCmd)) - clientIP, err := GetHTTPContentFromTestContainer(ctx, config, nodeIP, tcpNodePort, e2eservice.KubeProxyLagTimeout, dialCmd) - framework.ExpectNoError(err) - framework.Logf("ClientIP detected by target pod using NodePort is %s, the ip of test container is %s", clientIP, config.TestContainerPod.Status.PodIP) - // the clientIP returned by agnhost contains port - if !strings.HasPrefix(clientIP, config.TestContainerPod.Status.PodIP) { - framework.Failf("Source IP was NOT preserved") - } - } - }) - ginkgo.It("should only target nodes with endpoints", func(ctx context.Context) { namespace := f.Namespace.Name serviceName := "external-local-nodes" diff --git a/test/e2e/network/service.go b/test/e2e/network/service.go index b9e39b87f54..f7e7e248d1b 100644 --- a/test/e2e/network/service.go +++ b/test/e2e/network/service.go @@ -54,7 +54,7 @@ import ( cloudprovider "k8s.io/cloud-provider" netutils "k8s.io/utils/net" - utilpointer "k8s.io/utils/pointer" + "k8s.io/utils/ptr" podutil "k8s.io/kubernetes/pkg/api/v1/pod" "k8s.io/kubernetes/test/e2e/framework" @@ -1939,7 +1939,7 @@ var _ = common.SIGDescribe("Services", func() { }, } webserverPod0.Labels = jig.Labels - webserverPod0.Spec.TerminationGracePeriodSeconds = utilpointer.Int64(gracePeriod) + webserverPod0.Spec.TerminationGracePeriodSeconds = ptr.To(gracePeriod) e2epod.SetNodeSelection(&webserverPod0.Spec, e2epod.NodeSelection{Name: node0.Name}) _, err = cs.CoreV1().Pods(ns).Create(ctx, webserverPod0, metav1.CreateOptions{}) @@ -2058,7 +2058,7 @@ var _ = common.SIGDescribe("Services", func() { }, } webserverPod0.Labels = jig.Labels - webserverPod0.Spec.TerminationGracePeriodSeconds = utilpointer.Int64(gracePeriod) + webserverPod0.Spec.TerminationGracePeriodSeconds = ptr.To(gracePeriod) e2epod.SetNodeSelection(&webserverPod0.Spec, e2epod.NodeSelection{Name: node0.Name}) _, err = cs.CoreV1().Pods(ns).Create(ctx, webserverPod0, metav1.CreateOptions{}) @@ -2692,6 +2692,78 @@ var _ = common.SIGDescribe("Services", func() { } }) + ginkgo.It("should support externalTrafficPolicy=Local for type=NodePort", func(ctx context.Context) { + e2eskipper.SkipUnlessNodeCountIsAtLeast(2) + + namespace := f.Namespace.Name + serviceName := "external-local-nodeport" + jig := e2eservice.NewTestJig(cs, namespace, serviceName) + + ginkgo.By("creating the service") + svc, err := jig.CreateOnlyLocalNodePortService(ctx, false) + framework.ExpectNoError(err, "creating the service") + tcpNodePort := int(svc.Spec.Ports[0].NodePort) + nodePortStr := fmt.Sprintf("%d", tcpNodePort) + framework.Logf("NodePort is %s", nodePortStr) + + ginkgo.By("creating a HostNetwork exec pod") + execPod := launchHostExecPod(ctx, cs, namespace, "hostexec") + execPod, err = cs.CoreV1().Pods(namespace).Get(ctx, execPod.Name, metav1.GetOptions{}) + framework.ExpectNoError(err, "getting podIP of execPod") + framework.Logf("execPod IP is %q", execPod.Status.PodIP) + + ginkgo.By("creating an endpoint for the service on a different node from the execPod") + _, err = jig.Run(ctx, func(rc *v1.ReplicationController) { + rc.Spec.Template.Spec.Affinity = &v1.Affinity{ + // We need to ensure the endpoint is on a different node + // from the exec pod, to ensure that the source IP of the + // traffic is the node's "public" IP. For + // node-to-pod-on-same-node traffic, it might end up using + // the "docker0" IP or something like that. + NodeAffinity: &v1.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{ + NodeSelectorTerms: []v1.NodeSelectorTerm{{ + MatchFields: []v1.NodeSelectorRequirement{{ + Key: "metadata.name", + Operator: "NotIn", + Values: []string{execPod.Spec.NodeName}, + }}, + }}, + }, + }, + } + }) + framework.ExpectNoError(err, "creating the endpoint pod") + + // Extract the single endpoint node IP from a map of endpoint node IPs + var endpointNodeIP string + endpointsNodeMap, err := getEndpointNodesWithInternalIP(ctx, jig) + framework.ExpectNoError(err, "fetching endpoint node IPs") + for node, nodeIP := range endpointsNodeMap { + framework.Logf("endpoint is on node %s (%s)", node, nodeIP) + endpointNodeIP = nodeIP + break + } + + ginkgo.By("connecting from the execpod to the NodePort on the endpoint's node") + cmd := fmt.Sprintf("curl -g -q -s --connect-timeout 3 http://%s/clientip", net.JoinHostPort(endpointNodeIP, nodePortStr)) + var clientIP string + err = wait.PollImmediate(framework.Poll, e2eservice.KubeProxyLagTimeout, func() (bool, error) { + clientIPPort, err := e2eoutput.RunHostCmd(execPod.Namespace, execPod.Name, cmd) + if err != nil { + framework.Logf("error connecting: %v", err) + return false, nil + } + clientIP, _, err = net.SplitHostPort(clientIPPort) + framework.ExpectNoError(err, "parsing clientip output") + return true, nil + }) + framework.ExpectNoError(err, "connecting to nodeport service") + if clientIP != execPod.Status.PodIP { + framework.Failf("Source IP %s is not the client IP", clientIP) + } + }) + ginkgo.It("should fail health check node port if there are only terminating endpoints", func(ctx context.Context) { // windows kube-proxy does not support this feature yet e2eskipper.SkipIfNodeOSDistroIs("windows") @@ -2722,7 +2794,7 @@ var _ = common.SIGDescribe("Services", func() { ginkgo.By("Creating 1 webserver pod to be part of the TCP service") webserverPod0 := e2epod.NewAgnhostPod(ns, "echo-hostname-0", nil, nil, nil, "netexec", "--http-port", strconv.Itoa(servicePort), "--delay-shutdown", "100") webserverPod0.Labels = jig.Labels - webserverPod0.Spec.TerminationGracePeriodSeconds = utilpointer.Int64(100) + webserverPod0.Spec.TerminationGracePeriodSeconds = ptr.To[int64](100) e2epod.SetNodeSelection(&webserverPod0.Spec, e2epod.NodeSelection{Name: node0.Name}) _, err = cs.CoreV1().Pods(ns).Create(ctx, webserverPod0, metav1.CreateOptions{}) @@ -2763,7 +2835,7 @@ var _ = common.SIGDescribe("Services", func() { framework.ExpectNoError(err) // validate that the health check node port from kube-proxy returns 503 when there are no ready endpoints - err = wait.PollImmediate(time.Second, time.Minute, func() (bool, error) { + err = wait.PollUntilContextTimeout(ctx, time.Second, time.Minute, true, func(ctx context.Context) (bool, error) { cmd := fmt.Sprintf(`curl -s -o /dev/null -w "%%{http_code}" --max-time 5 http://%s/healthz`, healthCheckNodePortAddr) out, err := e2eoutput.RunHostCmd(pausePod0.Namespace, pausePod0.Name, cmd) if err != nil { @@ -2814,7 +2886,7 @@ var _ = common.SIGDescribe("Services", func() { ginkgo.By("Creating 1 webserver pod to be part of the TCP service") webserverPod0 := e2epod.NewAgnhostPod(ns, "echo-hostname-0", nil, nil, nil, "netexec", "--http-port", strconv.Itoa(servicePort), "--delay-shutdown", "100") webserverPod0.Labels = jig.Labels - webserverPod0.Spec.TerminationGracePeriodSeconds = utilpointer.Int64(100) + webserverPod0.Spec.TerminationGracePeriodSeconds = ptr.To[int64](100) e2epod.SetNodeSelection(&webserverPod0.Spec, e2epod.NodeSelection{Name: node0.Name}) _, err = cs.CoreV1().Pods(ns).Create(ctx, webserverPod0, metav1.CreateOptions{}) @@ -2889,7 +2961,7 @@ var _ = common.SIGDescribe("Services", func() { ginkgo.By("Creating 1 webserver pod to be part of the TCP service") webserverPod0 := e2epod.NewAgnhostPod(ns, "echo-hostname-0", nil, nil, nil, "netexec", "--http-port", strconv.Itoa(servicePort), "--delay-shutdown", "100") webserverPod0.Labels = jig.Labels - webserverPod0.Spec.TerminationGracePeriodSeconds = utilpointer.Int64(100) + webserverPod0.Spec.TerminationGracePeriodSeconds = ptr.To[int64](100) e2epod.SetNodeSelection(&webserverPod0.Spec, e2epod.NodeSelection{Name: node0.Name}) _, err = cs.CoreV1().Pods(ns).Create(ctx, webserverPod0, metav1.CreateOptions{}) @@ -2966,7 +3038,7 @@ var _ = common.SIGDescribe("Services", func() { ginkgo.By("Creating 1 webserver pod to be part of the TCP service") webserverPod0 := e2epod.NewAgnhostPod(ns, "echo-hostname-0", nil, nil, nil, "netexec", "--http-port", strconv.Itoa(servicePort), "--delay-shutdown", "100") webserverPod0.Labels = jig.Labels - webserverPod0.Spec.TerminationGracePeriodSeconds = utilpointer.Int64(100) + webserverPod0.Spec.TerminationGracePeriodSeconds = ptr.To[int64](100) e2epod.SetNodeSelection(&webserverPod0.Spec, e2epod.NodeSelection{Name: node0.Name}) _, err = cs.CoreV1().Pods(ns).Create(ctx, webserverPod0, metav1.CreateOptions{}) @@ -3042,7 +3114,7 @@ var _ = common.SIGDescribe("Services", func() { ginkgo.By("Creating 1 webserver pod to be part of the TCP service") webserverPod0 := e2epod.NewAgnhostPod(ns, "echo-hostname-0", nil, nil, nil, "netexec", "--http-port", strconv.Itoa(servicePort), "--delay-shutdown", "100") webserverPod0.Labels = jig.Labels - webserverPod0.Spec.TerminationGracePeriodSeconds = utilpointer.Int64(100) + webserverPod0.Spec.TerminationGracePeriodSeconds = ptr.To[int64](100) e2epod.SetNodeSelection(&webserverPod0.Spec, e2epod.NodeSelection{Name: node0.Name}) _, err = cs.CoreV1().Pods(ns).Create(ctx, webserverPod0, metav1.CreateOptions{}) @@ -3347,7 +3419,7 @@ var _ = common.SIGDescribe("Services", func() { Port: int32(80), TargetPort: intstr.FromInt32(80), }}, - LoadBalancerClass: utilpointer.String("example.com/internal-vip"), + LoadBalancerClass: ptr.To("example.com/internal-vip"), }, } _, err = cs.CoreV1().Services(ns).Create(ctx, &testService, metav1.CreateOptions{})