diff --git a/test/e2e/framework/network/utils.go b/test/e2e/framework/network/utils.go index 4182c56d811..64c864d1ced 100644 --- a/test/e2e/framework/network/utils.go +++ b/test/e2e/framework/network/utils.go @@ -160,6 +160,12 @@ type NetworkingTestConfig struct { Namespace string } +// NetexecDialResponse represents the response returned by the `netexec` subcommand of `agnhost` +type NetexecDialResponse struct { + Responses []string `json:"responses"` + Errors []string `json:"errors"` +} + // DialFromEndpointContainer executes a curl via kubectl exec in an endpoint container. func (config *NetworkingTestConfig) DialFromEndpointContainer(protocol, targetIP string, targetPort, maxTries, minTries int, expectedEps sets.String) { config.DialFromContainer(protocol, echoHostname, config.EndpointPods[0].Status.PodIP, targetIP, EndpointHTTPPort, targetPort, maxTries, minTries, expectedEps) @@ -212,6 +218,18 @@ func (config *NetworkingTestConfig) EndpointHostnames() sets.String { return expectedEps } +func makeCURLDialCommand(ipPort, dialCmd, protocol, targetIP string, targetPort int) string { + // The current versions of curl included in CentOS and RHEL distros + // misinterpret square brackets around IPv6 as globbing, so use the -g + // argument to disable globbing to handle the IPv6 case. + return fmt.Sprintf("curl -g -q -s 'http://%s/dial?request=%s&protocol=%s&host=%s&port=%d&tries=1'", + ipPort, + dialCmd, + protocol, + targetIP, + targetPort) +} + // DialFromContainer executes a curl via kubectl exec in a test container, // which might then translate to a tcp or udp request based on the protocol // argument in the url. @@ -232,38 +250,23 @@ func (config *NetworkingTestConfig) EndpointHostnames() sets.String { // pod and confirm it doesn't show up as an endpoint. func (config *NetworkingTestConfig) DialFromContainer(protocol, dialCommand, containerIP, targetIP string, containerHTTPPort, targetPort, maxTries, minTries int, expectedResponses sets.String) { ipPort := net.JoinHostPort(containerIP, strconv.Itoa(containerHTTPPort)) - // The current versions of curl included in CentOS and RHEL distros - // misinterpret square brackets around IPv6 as globbing, so use the -g - // argument to disable globbing to handle the IPv6 case. - cmd := fmt.Sprintf("curl -g -q -s 'http://%s/dial?request=%s&protocol=%s&host=%s&port=%d&tries=1'", - ipPort, - dialCommand, - protocol, - targetIP, - targetPort) + cmd := makeCURLDialCommand(ipPort, dialCommand, protocol, targetIP, targetPort) responses := sets.NewString() for i := 0; i < maxTries; i++ { - stdout, stderr, err := config.f.ExecShellInPodWithFullOutput(config.TestContainerPod.Name, cmd) + resp, err := config.GetResponseFromContainer(protocol, dialCommand, containerIP, targetIP, containerHTTPPort, targetPort) if err != nil { // A failure to kubectl exec counts as a try, not a hard fail. // Also note that we will keep failing for maxTries in tests where // we confirm unreachability. - framework.Logf("Failed to execute %q: %v, stdout: %q, stderr %q", cmd, err, stdout, stderr) - } else { - var output map[string][]string - if err := json.Unmarshal([]byte(stdout), &output); err != nil { - framework.Logf("WARNING: Failed to unmarshal curl response. Cmd %v run in %v, output: %s, err: %v", - cmd, config.TestContainerPod.Name, stdout, err) - continue - } - - for _, response := range output["responses"] { - trimmed := strings.TrimSpace(response) - if trimmed != "" { - responses.Insert(trimmed) - } + framework.Logf("GetResponseFromContainer: %s", err) + continue + } + for _, response := range resp.Responses { + trimmed := strings.TrimSpace(response) + if trimmed != "" { + responses.Insert(trimmed) } } framework.Logf("Waiting for responses: %v", expectedResponses.Difference(responses)) @@ -294,14 +297,7 @@ func (config *NetworkingTestConfig) GetEndpointsFromTestContainer(protocol, targ // we don't see any endpoints, the test fails. func (config *NetworkingTestConfig) GetEndpointsFromContainer(protocol, containerIP, targetIP string, containerHTTPPort, targetPort, tries int) (sets.String, error) { ipPort := net.JoinHostPort(containerIP, strconv.Itoa(containerHTTPPort)) - // The current versions of curl included in CentOS and RHEL distros - // misinterpret square brackets around IPv6 as globbing, so use the -g - // argument to disable globbing to handle the IPv6 case. - cmd := fmt.Sprintf("curl -g -q -s 'http://%s/dial?request=hostName&protocol=%s&host=%s&port=%d&tries=1'", - ipPort, - protocol, - targetIP, - targetPort) + cmd := makeCURLDialCommand(ipPort, "hostName", protocol, targetIP, targetPort) eps := sets.NewString() @@ -314,14 +310,14 @@ func (config *NetworkingTestConfig) GetEndpointsFromContainer(protocol, containe framework.Logf("Failed to execute %q: %v, stdout: %q, stderr: %q", cmd, err, stdout, stderr) } else { framework.Logf("Tries: %d, in try: %d, stdout: %v, stderr: %v, command run in: %#v", tries, i, stdout, stderr, config.TestContainerPod) - var output map[string][]string + var output NetexecDialResponse if err := json.Unmarshal([]byte(stdout), &output); err != nil { framework.Logf("WARNING: Failed to unmarshal curl response. Cmd %v run in %v, output: %s, err: %v", cmd, config.TestContainerPod.Name, stdout, err) continue } - for _, hostName := range output["responses"] { + for _, hostName := range output.Responses { trimmed := strings.TrimSpace(hostName) if trimmed != "" { eps.Insert(trimmed) @@ -334,6 +330,50 @@ func (config *NetworkingTestConfig) GetEndpointsFromContainer(protocol, containe return eps, nil } +// GetResponseFromContainer executes a curl via kubectl exec in a container. +func (config *NetworkingTestConfig) GetResponseFromContainer(protocol, dialCommand, containerIP, targetIP string, containerHTTPPort, targetPort int) (NetexecDialResponse, error) { + ipPort := net.JoinHostPort(containerIP, strconv.Itoa(containerHTTPPort)) + cmd := makeCURLDialCommand(ipPort, dialCommand, protocol, targetIP, targetPort) + + stdout, stderr, err := config.f.ExecShellInPodWithFullOutput(config.TestContainerPod.Name, cmd) + if err != nil { + return NetexecDialResponse{}, fmt.Errorf("failed to execute %q: %v, stdout: %q, stderr: %q", cmd, err, stdout, stderr) + } + + var output NetexecDialResponse + if err := json.Unmarshal([]byte(stdout), &output); err != nil { + return NetexecDialResponse{}, fmt.Errorf("failed to unmarshal curl response. Cmd %v run in %v, output: %s, err: %v", + cmd, config.TestContainerPod.Name, stdout, err) + } + return output, nil +} + +// GetResponseFromTestContainer executes a curl via kubectl exec in a test container. +func (config *NetworkingTestConfig) GetResponseFromTestContainer(protocol, dialCommand, targetIP string, targetPort int) (NetexecDialResponse, error) { + return config.GetResponseFromContainer(protocol, dialCommand, config.TestContainerPod.Status.PodIP, targetIP, testContainerHTTPPort, targetPort) +} + +// GetHTTPCodeFromTestContainer executes a curl via kubectl exec in a test container and returns the status code. +func (config *NetworkingTestConfig) GetHTTPCodeFromTestContainer(path, targetIP string, targetPort int) (int, error) { + cmd := fmt.Sprintf("curl -g -q -s -o /dev/null -w %%{http_code} http://%s:%d%s", + targetIP, + targetPort, + path) + stdout, stderr, err := config.f.ExecShellInPodWithFullOutput(config.TestContainerPod.Name, cmd) + // We only care about the status code reported by curl, + // and want to return any other errors, such as cannot execute command in the Pod. + // If curl failed to connect to host, it would exit with code 7, which makes `ExecShellInPodWithFullOutput` + // return a non-nil error and output "000" to stdout. + if err != nil && len(stdout) == 0 { + return 0, fmt.Errorf("failed to execute %q: %v, stderr: %q", cmd, err, stderr) + } + code, err := strconv.Atoi(stdout) + if err != nil { + return 0, fmt.Errorf("failed to parse status code returned by healthz endpoint: %w, code: %s", err, stdout) + } + return code, nil +} + // DialFromNode executes a tcp or udp request based on protocol via kubectl exec // in a test container running with host networking. // - minTries is the minimum number of curl attempts required before declaring diff --git a/test/e2e/framework/service/jig.go b/test/e2e/framework/service/jig.go index caa383a2297..b1c48b1376f 100644 --- a/test/e2e/framework/service/jig.go +++ b/test/e2e/framework/service/jig.go @@ -260,23 +260,44 @@ func (j *TestJig) CreateLoadBalancerService(timeout time.Duration, tweak func(sv // GetEndpointNodes returns a map of nodenames:external-ip on which the // endpoints of the Service are running. func (j *TestJig) GetEndpointNodes() (map[string][]string, error) { - nodes, err := e2enode.GetBoundedReadySchedulableNodes(j.Client, MaxNodesForEndpointsTests) - if err != nil { - return nil, err - } - epNodes, err := j.GetEndpointNodeNames() + return j.GetEndpointNodesWithIP(v1.NodeExternalIP) +} + +// GetEndpointNodesWithIP returns a map of nodenames: on which the +// endpoints of the Service are running. +func (j *TestJig) GetEndpointNodesWithIP(addressType v1.NodeAddressType) (map[string][]string, error) { + nodes, err := j.ListNodesWithEndpoint() if err != nil { return nil, err } nodeMap := map[string][]string{} - for _, n := range nodes.Items { - if epNodes.Has(n.Name) { - nodeMap[n.Name] = e2enode.GetAddresses(&n, v1.NodeExternalIP) - } + for _, node := range nodes { + nodeMap[node.Name] = e2enode.GetAddresses(&node, addressType) } return nodeMap, nil } +// ListNodesWithEndpoint returns a list of nodes on which the +// endpoints of the given Service are running. +func (j *TestJig) ListNodesWithEndpoint() ([]v1.Node, error) { + nodeNames, err := j.GetEndpointNodeNames() + if err != nil { + return nil, err + } + ctx := context.TODO() + allNodes, err := j.Client.CoreV1().Nodes().List(ctx, metav1.ListOptions{}) + if err != nil { + return nil, err + } + epNodes := make([]v1.Node, 0, nodeNames.Len()) + for _, node := range allNodes.Items { + if nodeNames.Has(node.Name) { + epNodes = append(epNodes, node) + } + } + return epNodes, nil +} + // GetEndpointNodeNames returns a string set of node names on which the // endpoints of the given Service are running. func (j *TestJig) GetEndpointNodeNames() (sets.String, error) { diff --git a/test/e2e/network/service.go b/test/e2e/network/service.go index 51f3dda1dac..7b94f093823 100644 --- a/test/e2e/network/service.go +++ b/test/e2e/network/service.go @@ -17,7 +17,6 @@ limitations under the License. package network import ( - "bytes" "context" "encoding/json" "errors" @@ -615,6 +614,32 @@ func testHTTPHealthCheckNodePort(ip string, port int, request string) (bool, err return false, fmt.Errorf("unexpected HTTP response code %s from health check responder at %s", resp.Status, url) } +func testHTTPHealthCheckNodePortFromTestContainer(config *e2enetwork.NetworkingTestConfig, host string, port int, timeout time.Duration, expectSucceed bool, threshold int) error { + count := 0 + pollFn := func() (bool, error) { + statusCode, err := config.GetHTTPCodeFromTestContainer( + "/healthz", + host, + port) + if err != nil { + framework.Logf("Got error reading status code from http://%s:%d/healthz via test container: %v", host, port, err) + return false, nil + } + framework.Logf("Got status code from http://%s:%d/healthz via test container: %d", host, port, statusCode) + success := statusCode == 200 + if (success && expectSucceed) || + (!success && !expectSucceed) { + count++ + } + return count >= threshold, nil + } + err := wait.PollImmediate(time.Second, timeout, pollFn) + if err != nil { + return fmt.Errorf("error waiting for healthCheckNodePort: expected at least %d succeed=%v on %v:%v/healthz, got %d", threshold, expectSucceed, host, port, count) + } + return nil +} + // Does an HTTP GET, but does not reuse TCP connections // This masks problems where the iptables rule has changed, but we don't see it func httpGetNoConnectionPoolTimeout(url string, timeout time.Duration) (*http.Response, error) { @@ -683,6 +708,23 @@ func waitForApiserverUp(c clientset.Interface) error { return fmt.Errorf("waiting for apiserver timed out") } +// getEndpointNodesWithInternalIP returns a map of nodenames:internal-ip on which the +// endpoints of the Service are running. +func getEndpointNodesWithInternalIP(jig *e2eservice.TestJig) (map[string]string, error) { + nodesWithIPs, err := jig.GetEndpointNodesWithIP(v1.NodeInternalIP) + if err != nil { + return nil, err + } + endpointsNodeMap := make(map[string]string, len(nodesWithIPs)) + for nodeName, internalIPs := range nodesWithIPs { + if len(internalIPs) < 1 { + return nil, fmt.Errorf("no internal ip found for node %s", nodeName) + } + endpointsNodeMap[nodeName] = internalIPs[0] + } + return endpointsNodeMap, nil +} + var _ = SIGDescribe("Services", func() { f := framework.NewDefaultFramework("services") @@ -2885,11 +2927,18 @@ var _ = SIGDescribe("ESIPP [Slow]", func() { framework.ExpectNoError(err) // Make sure we didn't leak the health check node port. - threshold := 2 - nodes, err := jig.GetEndpointNodes() + const threshold = 2 + nodes, err := getEndpointNodesWithInternalIP(jig) framework.ExpectNoError(err) - for _, ips := range nodes { - err := TestHTTPHealthCheckNodePort(ips[0], healthCheckNodePort, "/healthz", e2eservice.KubeProxyEndpointLagTimeout, false, threshold) + config := e2enetwork.NewNetworkingTestConfig(f, false, false) + for _, internalIP := range nodes { + err := testHTTPHealthCheckNodePortFromTestContainer( + config, + internalIP, + healthCheckNodePort, + e2eservice.KubeProxyLagTimeout, + false, + threshold) framework.ExpectNoError(err) } err = cs.CoreV1().Services(svc.Namespace).Delete(context.TODO(), svc.Name, metav1.DeleteOptions{}) @@ -2923,17 +2972,20 @@ var _ = SIGDescribe("ESIPP [Slow]", func() { }() tcpNodePort := int(svc.Spec.Ports[0].NodePort) - endpointsNodeMap, err := jig.GetEndpointNodes() - framework.ExpectNoError(err) - path := "/clientip" - for nodeName, nodeIPs := range endpointsNodeMap { - nodeIP := nodeIPs[0] - ginkgo.By(fmt.Sprintf("reading clientIP using the TCP service's NodePort, on node %v: %v%v%v", nodeName, nodeIP, tcpNodePort, path)) - content := GetHTTPContent(nodeIP, tcpNodePort, e2eservice.KubeProxyLagTimeout, path) - clientIP := content.String() - framework.Logf("ClientIP detected by target pod using NodePort is %s", clientIP) - if strings.HasPrefix(clientIP, "10.") { + endpointsNodeMap, err := getEndpointNodesWithInternalIP(jig) + framework.ExpectNoError(err) + + dialCmd := "clientip" + config := e2enetwork.NewNetworkingTestConfig(f, false, false) + + for nodeName, nodeIP := range endpointsNodeMap { + ginkgo.By(fmt.Sprintf("reading clientIP using the TCP service's NodePort, on node %v: %v:%v/%v", nodeName, nodeIP, tcpNodePort, dialCmd)) + clientIP, err := GetHTTPContentFromTestContainer(config, nodeIP, tcpNodePort, e2eservice.KubeProxyLagTimeout, dialCmd) + framework.ExpectNoError(err) + framework.Logf("ClientIP detected by target pod using NodePort is %s, the ip of test container is %s", clientIP, config.TestContainerPod.Status.PodIP) + // the clientIP returned by agnhost contains port + if !strings.HasPrefix(clientIP, config.TestContainerPod.Status.PodIP) { framework.Failf("Source IP was NOT preserved") } } @@ -2970,13 +3022,13 @@ var _ = SIGDescribe("ESIPP [Slow]", func() { framework.Failf("Service HealthCheck NodePort was not allocated") } - ips := e2enode.CollectAddresses(nodes, v1.NodeExternalIP) + ips := e2enode.CollectAddresses(nodes, v1.NodeInternalIP) ingressIP := e2eservice.GetIngressPoint(&svc.Status.LoadBalancer.Ingress[0]) svcTCPPort := int(svc.Spec.Ports[0].Port) - threshold := 2 - path := "/healthz" + const threshold = 2 + config := e2enetwork.NewNetworkingTestConfig(f, false, false) for i := 0; i < len(nodes.Items); i++ { endpointNodeName := nodes.Items[i].Name @@ -2995,15 +3047,21 @@ var _ = SIGDescribe("ESIPP [Slow]", func() { // HealthCheck should pass only on the node where num(endpoints) > 0 // All other nodes should fail the healthcheck on the service healthCheckNodePort - for n, publicIP := range ips { + for n, internalIP := range ips { // Make sure the loadbalancer picked up the health check change. // Confirm traffic can reach backend through LB before checking healthcheck nodeport. e2eservice.TestReachableHTTP(ingressIP, svcTCPPort, e2eservice.KubeProxyLagTimeout) expectedSuccess := nodes.Items[n].Name == endpointNodeName port := strconv.Itoa(healthCheckNodePort) - ipPort := net.JoinHostPort(publicIP, port) - framework.Logf("Health checking %s, http://%s%s, expectedSuccess %v", nodes.Items[n].Name, ipPort, path, expectedSuccess) - err := TestHTTPHealthCheckNodePort(publicIP, healthCheckNodePort, path, e2eservice.KubeProxyEndpointLagTimeout, expectedSuccess, threshold) + ipPort := net.JoinHostPort(internalIP, port) + framework.Logf("Health checking %s, http://%s/healthz, expectedSuccess %v", nodes.Items[n].Name, ipPort, expectedSuccess) + err := testHTTPHealthCheckNodePortFromTestContainer( + config, + internalIP, + healthCheckNodePort, + e2eservice.KubeProxyEndpointLagTimeout, + expectedSuccess, + threshold) framework.ExpectNoError(err) } framework.ExpectNoError(e2erc.DeleteRCAndWaitForGC(f.ClientSet, namespace, serviceName)) @@ -3069,8 +3127,7 @@ var _ = SIGDescribe("ESIPP [Slow]", func() { } }) - // TODO: Get rid of [DisabledForLargeClusters] tag when issue #90047 is fixed. - ginkgo.It("should handle updates to ExternalTrafficPolicy field [DisabledForLargeClusters]", func() { + ginkgo.It("should handle updates to ExternalTrafficPolicy field", func() { namespace := f.Namespace.Name serviceName := "external-local-update" jig := e2eservice.NewTestJig(cs, namespace, serviceName) @@ -3103,42 +3160,71 @@ var _ = SIGDescribe("ESIPP [Slow]", func() { framework.Failf("Service HealthCheck NodePort still present") } - endpointNodeMap, err := jig.GetEndpointNodes() + epNodes, err := jig.ListNodesWithEndpoint() framework.ExpectNoError(err) - noEndpointNodeMap := map[string][]string{} - for _, n := range nodes.Items { - if _, ok := endpointNodeMap[n.Name]; ok { - continue + // map from name of nodes with endpoint to internal ip + // it is assumed that there is only a single node with the endpoint + endpointNodeMap := make(map[string]string) + // map from name of nodes without endpoint to internal ip + noEndpointNodeMap := make(map[string]string) + for _, node := range epNodes { + ips := e2enode.GetAddresses(&node, v1.NodeInternalIP) + if len(ips) < 1 { + framework.Failf("No internal ip found for node %s", node.Name) } - noEndpointNodeMap[n.Name] = e2enode.GetAddresses(&n, v1.NodeExternalIP) + endpointNodeMap[node.Name] = ips[0] } + for _, n := range nodes.Items { + ips := e2enode.GetAddresses(&n, v1.NodeInternalIP) + if len(ips) < 1 { + framework.Failf("No internal ip found for node %s", n.Name) + } + if _, ok := endpointNodeMap[n.Name]; !ok { + noEndpointNodeMap[n.Name] = ips[0] + } + } + framework.ExpectNotEqual(len(endpointNodeMap), 0) + framework.ExpectNotEqual(len(noEndpointNodeMap), 0) svcTCPPort := int(svc.Spec.Ports[0].Port) svcNodePort := int(svc.Spec.Ports[0].NodePort) ingressIP := e2eservice.GetIngressPoint(&svc.Status.LoadBalancer.Ingress[0]) path := "/clientip" + dialCmd := "clientip" + + config := e2enetwork.NewNetworkingTestConfig(f, false, false) ginkgo.By(fmt.Sprintf("endpoints present on nodes %v, absent on nodes %v", endpointNodeMap, noEndpointNodeMap)) - for nodeName, nodeIPs := range noEndpointNodeMap { - ginkgo.By(fmt.Sprintf("Checking %v (%v:%v%v) proxies to endpoints on another node", nodeName, nodeIPs[0], svcNodePort, path)) - GetHTTPContent(nodeIPs[0], svcNodePort, e2eservice.KubeProxyLagTimeout, path) + for nodeName, nodeIP := range noEndpointNodeMap { + ginkgo.By(fmt.Sprintf("Checking %v (%v:%v/%v) proxies to endpoints on another node", nodeName, nodeIP[0], svcNodePort, dialCmd)) + _, err := GetHTTPContentFromTestContainer(config, nodeIP, svcNodePort, e2eservice.KubeProxyLagTimeout, dialCmd) + framework.ExpectNoError(err, "Could not reach HTTP service through %v:%v/%v after %v", nodeIP, svcNodePort, dialCmd, e2eservice.KubeProxyLagTimeout) } - for nodeName, nodeIPs := range endpointNodeMap { - ginkgo.By(fmt.Sprintf("checking kube-proxy health check fails on node with endpoint (%s), public IP %s", nodeName, nodeIPs[0])) - var body bytes.Buffer - pollfn := func() (bool, error) { - result := e2enetwork.PokeHTTP(nodeIPs[0], healthCheckNodePort, "/healthz", nil) - if result.Code == 0 { + for nodeName, nodeIP := range endpointNodeMap { + ginkgo.By(fmt.Sprintf("checking kube-proxy health check fails on node with endpoint (%s), public IP %s", nodeName, nodeIP)) + var body string + pollFn := func() (bool, error) { + // we expect connection failure here, but not other errors + resp, err := config.GetResponseFromTestContainer( + "http", + "healthz", + nodeIP, + healthCheckNodePort) + if err != nil { + return false, nil + } + if len(resp.Errors) > 0 { return true, nil } - body.Reset() - body.Write(result.Body) + if len(resp.Responses) > 0 { + body = resp.Responses[0] + } return false, nil } - if pollErr := wait.PollImmediate(framework.Poll, e2eservice.TestTimeout, pollfn); pollErr != nil { + if pollErr := wait.PollImmediate(framework.Poll, e2eservice.TestTimeout, pollFn); pollErr != nil { framework.Failf("Kube-proxy still exposing health check on node %v:%v, after ESIPP was turned off. body %s", - nodeName, healthCheckNodePort, body.String()) + nodeName, healthCheckNodePort, body) } } diff --git a/test/e2e/network/util.go b/test/e2e/network/util.go index ec6048c7b1e..7e9b03b7895 100644 --- a/test/e2e/network/util.go +++ b/test/e2e/network/util.go @@ -53,6 +53,23 @@ func GetHTTPContent(host string, port int, timeout time.Duration, url string) by return body } +// GetHTTPContentFromTestContainer returns the content of the given url by HTTP via a test container. +func GetHTTPContentFromTestContainer(config *e2enetwork.NetworkingTestConfig, host string, port int, timeout time.Duration, dialCmd string) (string, error) { + var body string + pollFn := func() (bool, error) { + resp, err := config.GetResponseFromTestContainer("http", dialCmd, host, port) + if err != nil || len(resp.Errors) > 0 || len(resp.Responses) == 0 { + return false, nil + } + body = resp.Responses[0] + return true, nil + } + if pollErr := wait.PollImmediate(framework.Poll, timeout, pollFn); pollErr != nil { + return "", pollErr + } + return body, nil +} + // DescribeSvc logs the output of kubectl describe svc for the given namespace func DescribeSvc(ns string) { framework.Logf("\nOutput of kubectl describe svc:\n")