From 8e8bef66f6983e51cdc8df757cfa7330e2e7bbfe Mon Sep 17 00:00:00 2001 From: Dan Winship Date: Fri, 31 May 2024 08:17:09 -0400 Subject: [PATCH 1/2] Refactor endpoint/execpod antiaffinity Also make launchHostExecPod return a fully-filled-in Pod object. --- test/e2e/network/service.go | 79 ++++++++++++++++++------------------- 1 file changed, 39 insertions(+), 40 deletions(-) diff --git a/test/e2e/network/service.go b/test/e2e/network/service.go index 2f23756e6d9..8de3c8ad4e3 100644 --- a/test/e2e/network/service.go +++ b/test/e2e/network/service.go @@ -333,7 +333,7 @@ func StopServeHostnameService(ctx context.Context, clientset clientset.Interface // given expectedPods list after a sort | uniq. func verifyServeHostnameServiceUp(ctx context.Context, c clientset.Interface, ns string, expectedPods []string, serviceIP string, servicePort int) error { // to verify from host network - hostExecPod := launchHostExecPod(ctx, c, ns, "verify-service-up-host-exec-pod") + hostExecPod := launchHostExecPod(ctx, c, ns, "verify-service-up-host-exec-pod", nil) // to verify from container's network execPod := e2epod.CreateExecPodOrFail(ctx, c, ns, "verify-service-up-exec-pod-", nil) @@ -403,7 +403,7 @@ func verifyServeHostnameServiceUp(ctx context.Context, c clientset.Interface, ns // verifyServeHostnameServiceDown verifies that the given service isn't served. func verifyServeHostnameServiceDown(ctx context.Context, c clientset.Interface, ns string, serviceIP string, servicePort int) error { // verify from host network - hostExecPod := launchHostExecPod(ctx, c, ns, "verify-service-down-host-exec-pod") + hostExecPod := launchHostExecPod(ctx, c, ns, "verify-service-down-host-exec-pod", nil) defer func() { e2epod.DeletePodOrFail(ctx, c, ns, hostExecPod.Name) }() @@ -1706,7 +1706,7 @@ var _ = common.SIGDescribe("Services", func() { err = t.DeleteService(serviceName) framework.ExpectNoError(err, "failed to delete service: %s in namespace: %s", serviceName, ns) - hostExec := launchHostExecPod(ctx, f.ClientSet, f.Namespace.Name, "hostexec") + hostExec := launchHostExecPod(ctx, f.ClientSet, f.Namespace.Name, "hostexec", nil) cmd := fmt.Sprintf(`! ss -ant46 'sport = :%d' | tail -n +2 | grep LISTEN`, nodePort) var stdout string if pollErr := wait.PollImmediate(framework.Poll, e2eservice.KubeProxyLagTimeout, func() (bool, error) { @@ -2705,51 +2705,31 @@ var _ = common.SIGDescribe("Services", func() { jig := e2eservice.NewTestJig(cs, namespace, serviceName) ginkgo.By("creating the service") - svc, err := jig.CreateOnlyLocalNodePortService(ctx, false) + svc, err := jig.CreateOnlyLocalNodePortService(ctx, true) framework.ExpectNoError(err, "creating the service") tcpNodePort := int(svc.Spec.Ports[0].NodePort) nodePortStr := fmt.Sprintf("%d", tcpNodePort) framework.Logf("NodePort is %s", nodePortStr) - ginkgo.By("creating a HostNetwork exec pod") - execPod := launchHostExecPod(ctx, cs, namespace, "hostexec") - execPod, err = cs.CoreV1().Pods(namespace).Get(ctx, execPod.Name, metav1.GetOptions{}) - framework.ExpectNoError(err, "getting podIP of execPod") - framework.Logf("execPod IP is %q", execPod.Status.PodIP) - - ginkgo.By("creating an endpoint for the service on a different node from the execPod") - _, err = jig.Run(ctx, func(rc *v1.ReplicationController) { - rc.Spec.Template.Spec.Affinity = &v1.Affinity{ - // We need to ensure the endpoint is on a different node - // from the exec pod, to ensure that the source IP of the - // traffic is the node's "public" IP. For - // node-to-pod-on-same-node traffic, it might end up using - // the "docker0" IP or something like that. - NodeAffinity: &v1.NodeAffinity{ - RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{ - NodeSelectorTerms: []v1.NodeSelectorTerm{{ - MatchFields: []v1.NodeSelectorRequirement{{ - Key: "metadata.name", - Operator: "NotIn", - Values: []string{execPod.Spec.NodeName}, - }}, - }}, - }, - }, - } - }) - framework.ExpectNoError(err, "creating the endpoint pod") - - // Extract the single endpoint node IP from a map of endpoint node IPs - var endpointNodeIP string + // Get the (single) endpoint's node name and IP + var endpointNodeName, endpointNodeIP string endpointsNodeMap, err := getEndpointNodesWithInternalIP(ctx, jig) framework.ExpectNoError(err, "fetching endpoint node IPs") for node, nodeIP := range endpointsNodeMap { framework.Logf("endpoint is on node %s (%s)", node, nodeIP) + endpointNodeName = node endpointNodeIP = nodeIP break } + // We need to ensure the endpoint is on a different node from the exec + // pod, to ensure that the source IP of the traffic is the node's "public" + // IP. For node-to-pod-on-same-node traffic, it might end up using the + // "docker0" IP or something like that. + ginkgo.By("creating a HostNetwork exec pod on a different node") + execPod := launchHostExecPod(ctx, cs, namespace, "hostexec", &endpointNodeName) + framework.Logf("execPod IP is %q", execPod.Status.PodIP) + ginkgo.By("connecting from the execpod to the NodePort on the endpoint's node") cmd := fmt.Sprintf("curl -g -q -s --connect-timeout 3 http://%s/clientip", net.JoinHostPort(endpointNodeIP, nodePortStr)) var clientIP string @@ -4184,14 +4164,33 @@ func createPodOrFail(ctx context.Context, f *framework.Framework, ns, name strin } // launchHostExecPod launches a hostexec pod in the given namespace and waits -// until it's Running -func launchHostExecPod(ctx context.Context, client clientset.Interface, ns, name string) *v1.Pod { +// until it's Running. If avoidNode is non-nil, it will ensure that the pod doesn't +// land on that node. +func launchHostExecPod(ctx context.Context, client clientset.Interface, ns, name string, avoidNode *string) *v1.Pod { framework.Logf("Creating new host exec pod") hostExecPod := e2epod.NewExecPodSpec(ns, name, true) - pod, err := client.CoreV1().Pods(ns).Create(ctx, hostExecPod, metav1.CreateOptions{}) - framework.ExpectNoError(err) + if avoidNode != nil { + hostExecPod.Spec.Affinity = &v1.Affinity{ + NodeAffinity: &v1.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{ + NodeSelectorTerms: []v1.NodeSelectorTerm{{ + MatchFields: []v1.NodeSelectorRequirement{{ + Key: "metadata.name", + Operator: "NotIn", + Values: []string{*avoidNode}, + }}, + }}, + }, + }, + } + } + _, err := client.CoreV1().Pods(ns).Create(ctx, hostExecPod, metav1.CreateOptions{}) + framework.ExpectNoError(err, "creating host exec pod") err = e2epod.WaitTimeoutForPodReadyInNamespace(ctx, client, name, ns, framework.PodStartTimeout) - framework.ExpectNoError(err) + framework.ExpectNoError(err, "waiting for host exec pod") + // re-fetch to get PodIP, etc + pod, err := client.CoreV1().Pods(ns).Get(ctx, name, metav1.GetOptions{}) + framework.ExpectNoError(err, "getting podIP of host exec pod") return pod } From a6b08a8ea41c9da659840afb3631152f0fb9c744 Mon Sep 17 00:00:00 2001 From: Dan Winship Date: Thu, 9 May 2024 15:53:59 -0400 Subject: [PATCH 2/2] Refactor "updates to ExternalTrafficPolicy" test The LoadBalancer test "should handle updates to ExternalTrafficPolicy field" had a bunch of problems, but the biggest is that (without doing [Disruptive] things to the cluster or making unwarranted assumptions about source IPs) it's very hard to *prove* that the cloud load balancer is doing Cluster traffic policy semantics (distributing connections to all nodes) rather than Local (distributing only to nodes with endpoints). So split the test into 2 new tests with more focused semantics: - "should implement NodePort and HealthCheckNodePort correctly when ExternalTrafficPolicy changes" (in service.go) tests that the service proxy is correctly implementing the proxy side of Cluster-vs-Local traffic policy for LoadBalancer Services, without testing the cloud load balancer itself at all. - "should target all nodes with endpoints" (in loadbalancer.go) complements the existing "should only target nodes with endpoints", to ensure that when a service has `externalTrafficPolicy: Local`, and there are endpoints on multiple nodes, that the cloud is correctly configured to target all of those endpoints, not just one. --- test/e2e/network/loadbalancer.go | 219 +++++++------------------------ test/e2e/network/service.go | 162 +++++++++++++++++++++++ 2 files changed, 210 insertions(+), 171 deletions(-) diff --git a/test/e2e/network/loadbalancer.go b/test/e2e/network/loadbalancer.go index cd869868b2a..5612a4b89d4 100644 --- a/test/e2e/network/loadbalancer.go +++ b/test/e2e/network/loadbalancer.go @@ -56,7 +56,6 @@ import ( "k8s.io/utils/ptr" "github.com/onsi/ginkgo/v2" - "github.com/onsi/gomega" ) // getInternalIP returns node internal IP @@ -990,7 +989,7 @@ var _ = common.SIGDescribe("LoadBalancers", feature.LoadBalancer, func() { var _ = common.SIGDescribe("LoadBalancers ExternalTrafficPolicy: Local", feature.LoadBalancer, framework.WithSlow(), func() { f := framework.NewDefaultFramework("esipp") - f.NamespacePodSecurityLevel = admissionapi.LevelBaseline + f.NamespacePodSecurityLevel = admissionapi.LevelPrivileged var loadBalancerCreateTimeout time.Duration var cs clientset.Interface @@ -1151,6 +1150,53 @@ var _ = common.SIGDescribe("LoadBalancers ExternalTrafficPolicy: Local", feature } }) + ginkgo.It("should target all nodes with endpoints", func(ctx context.Context) { + // FIXME: need a better platform-independent timeout + loadBalancerCreateTimeout := e2eservice.GetServiceLoadBalancerCreationTimeout(ctx, cs) + + nodes, err := e2enode.GetBoundedReadySchedulableNodes(ctx, cs, 2) + framework.ExpectNoError(err) + if len(nodes.Items) == 1 { + e2eskipper.Skipf("Test requires multiple schedulable nodes") + } + + namespace := f.Namespace.Name + serviceName := "external-local-update" + jig := e2eservice.NewTestJig(cs, namespace, serviceName) + + ginkgo.By("creating the service") + svc, err := jig.CreateOnlyLocalLoadBalancerService(ctx, loadBalancerCreateTimeout, false, nil) + framework.ExpectNoError(err, "creating the service") + ingress := e2eservice.GetIngressPoint(&svc.Status.LoadBalancer.Ingress[0]) + svcPort := int(svc.Spec.Ports[0].Port) + framework.Logf("ingress is %s:%d", ingress, svcPort) + + ginkgo.By("creating endpoints on multiple nodes") + _, err = jig.Run(ctx, func(rc *v1.ReplicationController) { + rc.Spec.Replicas = ptr.To[int32](2) + rc.Spec.Template.Spec.Affinity = &v1.Affinity{ + PodAntiAffinity: &v1.PodAntiAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: []v1.PodAffinityTerm{ + { + LabelSelector: &metav1.LabelSelector{MatchLabels: jig.Labels}, + TopologyKey: "kubernetes.io/hostname", + }, + }, + }, + } + }) + framework.ExpectNoError(err, "creating the endpoints") + + ginkgo.By("ensuring that the LoadBalancer targets all endpoints") + // We're not testing affinity here, but we can use checkAffinity(false) to + // test that affinity *isn't* enabled, which is to say, that connecting to + // ingress:svcPort multiple times eventually reaches at least 2 different + // endpoints. + if !checkAffinity(ctx, cs, nil, ingress, svcPort, false) { + framework.Failf("Load balancer connections only reached one of the two endpoints") + } + }) + ginkgo.It("should work from pods", func(ctx context.Context) { var err error namespace := f.Namespace.Name @@ -1216,175 +1262,6 @@ var _ = common.SIGDescribe("LoadBalancers ExternalTrafficPolicy: Local", feature framework.Failf("Source IP not preserved from %v, expected '%v' got '%v'", pausePod.Name, pausePod.Status.PodIP, srcIP) } }) - - ginkgo.It("should handle updates to ExternalTrafficPolicy field", func(ctx context.Context) { - namespace := f.Namespace.Name - serviceName := "external-local-update" - jig := e2eservice.NewTestJig(cs, namespace, serviceName) - - nodes, err := e2enode.GetBoundedReadySchedulableNodes(ctx, cs, e2eservice.MaxNodesForEndpointsTests) - framework.ExpectNoError(err) - if len(nodes.Items) < 2 { - framework.Failf("Need at least 2 nodes to verify source ip from a node without endpoint") - } - - svc, err := jig.CreateOnlyLocalLoadBalancerService(ctx, loadBalancerCreateTimeout, true, nil) - framework.ExpectNoError(err) - ginkgo.DeferCleanup(func(ctx context.Context) { - err = jig.ChangeServiceType(ctx, v1.ServiceTypeClusterIP, loadBalancerCreateTimeout) - framework.ExpectNoError(err) - err := cs.CoreV1().Services(svc.Namespace).Delete(ctx, svc.Name, metav1.DeleteOptions{}) - framework.ExpectNoError(err) - }) - - // save the health check node port because it disappears when Local traffic policy is turned off. - healthCheckNodePort := int(svc.Spec.HealthCheckNodePort) - - ginkgo.By("changing ExternalTrafficPolicy to Cluster") - svc, err = jig.UpdateService(ctx, func(svc *v1.Service) { - svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyCluster - }) - framework.ExpectNoError(err) - if svc.Spec.HealthCheckNodePort > 0 { - framework.Failf("Service HealthCheck NodePort still present") - } - - epNodes, err := jig.ListNodesWithEndpoint(ctx) - framework.ExpectNoError(err) - // map from name of nodes with endpoint to internal ip - // it is assumed that there is only a single node with the endpoint - endpointNodeMap := make(map[string]string) - // map from name of nodes without endpoint to internal ip - noEndpointNodeMap := make(map[string]string) - for _, node := range epNodes { - ips := e2enode.GetAddresses(&node, v1.NodeInternalIP) - if len(ips) < 1 { - framework.Failf("No internal ip found for node %s", node.Name) - } - endpointNodeMap[node.Name] = ips[0] - } - for _, n := range nodes.Items { - ips := e2enode.GetAddresses(&n, v1.NodeInternalIP) - if len(ips) < 1 { - framework.Failf("No internal ip found for node %s", n.Name) - } - if _, ok := endpointNodeMap[n.Name]; !ok { - noEndpointNodeMap[n.Name] = ips[0] - } - } - gomega.Expect(endpointNodeMap).ToNot(gomega.BeEmpty()) - gomega.Expect(noEndpointNodeMap).ToNot(gomega.BeEmpty()) - - svcTCPPort := int(svc.Spec.Ports[0].Port) - svcNodePort := int(svc.Spec.Ports[0].NodePort) - ingressIP := e2eservice.GetIngressPoint(&svc.Status.LoadBalancer.Ingress[0]) - path := "/clientip" - dialCmd := "clientip" - - config := e2enetwork.NewNetworkingTestConfig(ctx, f) - - ginkgo.By(fmt.Sprintf("endpoints present on nodes %v, absent on nodes %v", endpointNodeMap, noEndpointNodeMap)) - for nodeName, nodeIP := range noEndpointNodeMap { - ginkgo.By(fmt.Sprintf("Checking %v (%v:%v/%v) proxies to endpoints on another node", nodeName, nodeIP[0], svcNodePort, dialCmd)) - _, err := GetHTTPContentFromTestContainer(ctx, config, nodeIP, svcNodePort, e2eservice.KubeProxyLagTimeout, dialCmd) - framework.ExpectNoError(err, "Could not reach HTTP service through %v:%v/%v after %v", nodeIP, svcNodePort, dialCmd, e2eservice.KubeProxyLagTimeout) - } - - for nodeName, nodeIP := range endpointNodeMap { - ginkgo.By(fmt.Sprintf("checking kube-proxy health check fails on node with endpoint (%s), public IP %s", nodeName, nodeIP)) - var body string - pollFn := func(ctx context.Context) (bool, error) { - // we expect connection failure here, but not other errors - resp, err := config.GetResponseFromTestContainer(ctx, - "http", - "healthz", - nodeIP, - healthCheckNodePort) - if err != nil { - return false, nil - } - if len(resp.Errors) > 0 { - return true, nil - } - if len(resp.Responses) > 0 { - body = resp.Responses[0] - } - return false, nil - } - if pollErr := wait.PollUntilContextTimeout(ctx, framework.Poll, e2eservice.TestTimeout, true, pollFn); pollErr != nil { - framework.Failf("Kube-proxy still exposing health check on node %v:%v, after traffic policy set to Cluster. body %s", - nodeName, healthCheckNodePort, body) - } - } - - // Poll till kube-proxy re-adds the MASQUERADE rule on the node. - ginkgo.By(fmt.Sprintf("checking source ip is NOT preserved through loadbalancer %v", ingressIP)) - var clientIP string - pollErr := wait.PollUntilContextTimeout(ctx, framework.Poll, 3*e2eservice.KubeProxyLagTimeout, true, func(ctx context.Context) (bool, error) { - clientIPPort, err := GetHTTPContent(ingressIP, svcTCPPort, e2eservice.KubeProxyLagTimeout, path) - if err != nil { - return false, nil - } - // The clientIPPort returned from GetHTTPContent is in this format: x.x.x.x:port or [xx:xx:xx::x]:port - host, _, err := net.SplitHostPort(clientIPPort) - if err != nil { - framework.Logf("SplitHostPort returned unexpected error: %q", clientIPPort) - return false, nil - } - ip := netutils.ParseIPSloppy(host) - if ip == nil { - framework.Logf("Invalid client IP address format: %q", host) - return false, nil - } - if subnetPrefix.Contains(ip) { - return true, nil - } - return false, nil - }) - if pollErr != nil { - framework.Failf("Source IP WAS preserved with Cluster traffic policy. Got %v, expected a cluster ip.", clientIP) - } - - // TODO: We need to attempt to create another service with the previously - // allocated healthcheck nodePort. If the health check nodePort has been - // freed, the new service creation will succeed, upon which we cleanup. - // If the health check nodePort has NOT been freed, the new service - // creation will fail. - - ginkgo.By("setting ExternalTrafficPolicy back to Local") - svc, err = jig.UpdateService(ctx, func(svc *v1.Service) { - svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyLocal - // Request the same healthCheckNodePort as before, to test the user-requested allocation path - svc.Spec.HealthCheckNodePort = int32(healthCheckNodePort) - }) - framework.ExpectNoError(err) - loadBalancerPropagationTimeout := e2eservice.GetServiceLoadBalancerPropagationTimeout(ctx, cs) - pollErr = wait.PollUntilContextTimeout(ctx, framework.PollShortTimeout, loadBalancerPropagationTimeout, true, func(ctx context.Context) (bool, error) { - clientIPPort, err := GetHTTPContent(ingressIP, svcTCPPort, e2eservice.KubeProxyLagTimeout, path) - if err != nil { - return false, nil - } - ginkgo.By(fmt.Sprintf("Endpoint %v:%v%v returned client ip %v", ingressIP, svcTCPPort, path, clientIPPort)) - // The clientIPPort returned from GetHTTPContent is in this format: x.x.x.x:port or [xx:xx:xx::x]:port - host, _, err := net.SplitHostPort(clientIPPort) - if err != nil { - framework.Logf("SplitHostPort returned unexpected error: %q", clientIPPort) - return false, nil - } - ip := netutils.ParseIPSloppy(host) - if ip == nil { - framework.Logf("Invalid client IP address format: %q", host) - return false, nil - } - if !subnetPrefix.Contains(ip) { - return true, nil - } - return false, nil - }) - if pollErr != nil { - framework.Failf("Source IP (%v) is not the client IP after ExternalTrafficPolicy set back to Local, expected a public IP.", clientIP) - } - }) }) func ipToSourceRange(ip string) string { diff --git a/test/e2e/network/service.go b/test/e2e/network/service.go index 8de3c8ad4e3..11965b220e8 100644 --- a/test/e2e/network/service.go +++ b/test/e2e/network/service.go @@ -3914,6 +3914,168 @@ var _ = common.SIGDescribe("Services", func() { framework.Failf("The state of the sctp module has changed due to the test case") } }) + + ginkgo.It("should implement NodePort and HealthCheckNodePort correctly when ExternalTrafficPolicy changes", func(ctx context.Context) { + // This uses a LoadBalancer service but only tests the parts of it that are + // managed by the service proxy; there is another test in loadbalancer.go + // to test the load balancer parts. + + namespace := f.Namespace.Name + serviceName := "external-local-update" + jig := e2eservice.NewTestJig(cs, namespace, serviceName) + + nodes, err := e2enode.GetBoundedReadySchedulableNodes(ctx, cs, 3) + framework.ExpectNoError(err, "listing nodes") + if len(nodes.Items) != 3 { + e2eskipper.Skipf("Need at least 3 schedulable nodes for this test") + } + + ginkgo.By("creating the service") + svc, err := jig.CreateLoadBalancerServiceWaitForClusterIPOnly(func(svc *v1.Service) { + svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyLocal + }) + framework.ExpectNoError(err, "creating the service") + nodePortStr := fmt.Sprintf("%d", svc.Spec.Ports[0].NodePort) + hcNodePortStr := fmt.Sprintf("%d", svc.Spec.HealthCheckNodePort) + framework.Logf("NodePort is %s, HealthCheckNodePort is %s", nodePortStr, hcNodePortStr) + + ginkgo.By("creating an endpoint for the service") + _, err = jig.Run(ctx, nil) + framework.ExpectNoError(err, "creating the endpoint") + + var endpointNode, endpointNodeIP string + endpointsNodeMap, err := getEndpointNodesWithInternalIP(ctx, jig) + framework.ExpectNoError(err, "fetching endpoint node IP") + for node, nodeIP := range endpointsNodeMap { + framework.Logf("endpoint is on node %s (%s)", node, nodeIP) + endpointNode = node + endpointNodeIP = nodeIP + break + } + + ginkgo.By("creating a HostNetwork exec pod on a different node from the endpoint") + execPod := launchHostExecPod(ctx, cs, namespace, "hostexec", &endpointNode) + hostExecPodNodeIP := execPod.Status.HostIP + + ginkgo.By("finding a third node with neither the endpoint nor the hostexec pod") + var thirdNodeIP string + for i := range nodes.Items { + node := &nodes.Items[i] + ips := e2enode.GetAddresses(node, v1.NodeInternalIP) + if len(ips) < 1 { + framework.Failf("No internal ip found for node %s", node.Name) + } + if ips[0] != endpointNodeIP && ips[0] != hostExecPodNodeIP { + thirdNodeIP = ips[0] + break + } + } + gomega.Expect(thirdNodeIP).NotTo(gomega.Equal(""), "cluster has at least 3 nodes but could not find a third node IP?") + + checkOneNodePort := func(nodeIP string, expectResponse bool, trafficPolicy v1.ServiceExternalTrafficPolicy, deadline time.Time) { + cmd := fmt.Sprintf("curl -g -s --connect-timeout 3 http://%s/clientip", net.JoinHostPort(nodeIP, nodePortStr)) + var clientIP string + err := wait.PollUntilContextTimeout(ctx, framework.Poll, time.Until(deadline), true, func(ctx context.Context) (bool, error) { + out, err := e2eoutput.RunHostCmd(namespace, execPod.Name, cmd) + if !expectResponse { + return err != nil, nil + } else if err != nil { + return false, nil + } + + clientIP, _, err = net.SplitHostPort(out) + if err != nil { + return false, fmt.Errorf("could not parse clientip response %q: %w", out, err) + } + return true, nil + }) + framework.ExpectNoError(err, "expected correct response within timeout") + if expectResponse && trafficPolicy == v1.ServiceExternalTrafficPolicyLocal { + gomega.Expect(clientIP).To(gomega.Equal(hostExecPodNodeIP), "expected client IP to be preserved") + } + } + + checkOneHealthCheck := func(nodeIP string, expectResponse bool, expectStatus string, deadline time.Time) { + // "-i" means to return the HTTP headers in the response + cmd := fmt.Sprintf("curl -g -i -s --connect-timeout 3 http://%s/", net.JoinHostPort(nodeIP, hcNodePortStr)) + err := wait.PollUntilContextTimeout(ctx, framework.Poll, time.Until(deadline), true, func(ctx context.Context) (bool, error) { + out, err := e2eoutput.RunHostCmd(namespace, execPod.Name, cmd) + if !expectResponse { + return err != nil, nil + } else if err != nil { + return false, nil + } + + status := strings.Split(out, "\n")[0] + gotResponse := strings.Contains(status, expectStatus) + return gotResponse, nil + }) + framework.ExpectNoError(err, "expected response containing %s", expectStatus) + } + + // (In each round of checks, all nodes must be correct within a single + // KubeProxyEndpointLagTimeout; we don't want to allow a full + // KubeProxyEndpointLagTimeout interval for each checkOneNodePort call.) + deadline := time.Now().Add(e2eservice.KubeProxyEndpointLagTimeout) + + ginkgo.By("ensuring that the HealthCheckNodePort reports healthy on the endpoint node when ExternalTrafficPolicy is Local") + checkOneHealthCheck(endpointNodeIP, true, "200 OK", deadline) + ginkgo.By("ensuring that the HealthCheckNodePort reports UN-healthy on the execpod node when ExternalTrafficPolicy is Local") + checkOneHealthCheck(hostExecPodNodeIP, true, "503 Service Unavailable", deadline) + ginkgo.By("ensuring that the HealthCheckNodePort reports UN-healthy on the third node when ExternalTrafficPolicy is Local") + checkOneHealthCheck(thirdNodeIP, true, "503 Service Unavailable", deadline) + + ginkgo.By("ensuring that the NodePort responds on the endpoint node when ExternalTrafficPolicy is Local") + checkOneNodePort(endpointNodeIP, true, v1.ServiceExternalTrafficPolicyLocal, deadline) + ginkgo.By("ensuring that the NodePort responds (due to short-circuit rule) on the execpod node when ExternalTrafficPolicy is Local") + checkOneNodePort(hostExecPodNodeIP, true, v1.ServiceExternalTrafficPolicyLocal, deadline) + ginkgo.By("ensuring that the NodePort does not respond on the third node when ExternalTrafficPolicy is Local") + checkOneNodePort(thirdNodeIP, false, v1.ServiceExternalTrafficPolicyLocal, deadline) + + ginkgo.By("changing ExternalTrafficPolicy to Cluster") + oldHealthCheckNodePort := svc.Spec.HealthCheckNodePort + svc, err = jig.UpdateService(ctx, func(svc *v1.Service) { + svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyCluster + }) + framework.ExpectNoError(err, "updating Service") + if svc.Spec.HealthCheckNodePort > 0 { + framework.Failf("Service HealthCheck NodePort still present") + } + deadline = time.Now().Add(e2eservice.KubeProxyEndpointLagTimeout) + + // FIXME: this is racy; we need to use a reserved HCNP here. + ginkgo.By("ensuring that the HealthCheckNodePort no longer responds on the endpoint node when ExternalTrafficPolicy is Cluster") + checkOneHealthCheck(endpointNodeIP, false, "", deadline) + ginkgo.By("ensuring that the HealthCheckNodePort no longer responds on the execpod node when ExternalTrafficPolicy is Cluster") + checkOneHealthCheck(hostExecPodNodeIP, false, "", deadline) + ginkgo.By("ensuring that the HealthCheckNodePort no longer responds on the third node when ExternalTrafficPolicy is Cluster") + checkOneHealthCheck(thirdNodeIP, false, "", deadline) + + ginkgo.By("ensuring that the NodePort responds on the endpoint node when ExternalTrafficPolicy is Cluster") + checkOneNodePort(endpointNodeIP, true, v1.ServiceExternalTrafficPolicyCluster, deadline) + ginkgo.By("ensuring that the NodePort responds on the execpod node when ExternalTrafficPolicy is Cluster") + checkOneNodePort(hostExecPodNodeIP, true, v1.ServiceExternalTrafficPolicyCluster, deadline) + ginkgo.By("ensuring that the NodePort responds on the third node when ExternalTrafficPolicy is Cluster") + checkOneNodePort(thirdNodeIP, true, v1.ServiceExternalTrafficPolicyCluster, deadline) + + ginkgo.By("changing ExternalTrafficPolicy back to Local") + _, err = jig.UpdateService(ctx, func(svc *v1.Service) { + svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyLocal + // Request the same healthCheckNodePort as before, to test the user-requested allocation path + // FIXME: we need to use a reserved HCNP here. + svc.Spec.HealthCheckNodePort = oldHealthCheckNodePort + }) + framework.ExpectNoError(err, "updating ExternalTrafficPolicy and HealthCheckNodePort") + deadline = time.Now().Add(e2eservice.KubeProxyEndpointLagTimeout) + + ginkgo.By("ensuring that all NodePorts and HealthCheckNodePorts show the correct behavior again after changing ExternalTrafficPolicy back to Local") + checkOneHealthCheck(endpointNodeIP, true, "200 OK", deadline) + checkOneHealthCheck(hostExecPodNodeIP, true, "503 Service Unavailable", deadline) + checkOneHealthCheck(thirdNodeIP, true, "503 Service Unavailable", deadline) + checkOneNodePort(endpointNodeIP, true, v1.ServiceExternalTrafficPolicyLocal, deadline) + checkOneNodePort(hostExecPodNodeIP, true, v1.ServiceExternalTrafficPolicyLocal, deadline) + checkOneNodePort(thirdNodeIP, false, v1.ServiceExternalTrafficPolicyLocal, deadline) + }) }) // execAffinityTestForSessionAffinityTimeout is a helper function that wrap the logic of