From 708fb6b457d94c770890731e53090a975f53679a Mon Sep 17 00:00:00 2001 From: knight42 Date: Fri, 29 May 2020 12:00:40 +0800 Subject: [PATCH] fix(e2e): access nodes via test container in LB network tests Signed-off-by: knight42 --- test/e2e/framework/network/utils.go | 85 +++++++++++++++++++---------- test/e2e/framework/service/jig.go | 33 ++++++++--- test/e2e/network/service.go | 69 ++++++++++++++++------- test/e2e/network/util.go | 17 ++++++ 4 files changed, 146 insertions(+), 58 deletions(-) diff --git a/test/e2e/framework/network/utils.go b/test/e2e/framework/network/utils.go index f7e83ef1242..8bcb9a76705 100644 --- a/test/e2e/framework/network/utils.go +++ b/test/e2e/framework/network/utils.go @@ -160,6 +160,12 @@ type NetworkingTestConfig struct { Namespace string } +// NetexecDialResponse represents the response returned by the `netexec` subcommand of `agnhost` +type NetexecDialResponse struct { + Responses []string `json:"responses"` + Errors []string `json:"errors"` +} + // DialFromEndpointContainer executes a curl via kubectl exec in an endpoint container. func (config *NetworkingTestConfig) DialFromEndpointContainer(protocol, targetIP string, targetPort, maxTries, minTries int, expectedEps sets.String) { config.DialFromContainer(protocol, echoHostname, config.EndpointPods[0].Status.PodIP, targetIP, EndpointHTTPPort, targetPort, maxTries, minTries, expectedEps) @@ -212,6 +218,18 @@ func (config *NetworkingTestConfig) EndpointHostnames() sets.String { return expectedEps } +func makeCURLDialCommand(ipPort, dialCmd, protocol, targetIP string, targetPort int) string { + // The current versions of curl included in CentOS and RHEL distros + // misinterpret square brackets around IPv6 as globbing, so use the -g + // argument to disable globbing to handle the IPv6 case. + return fmt.Sprintf("curl -g -q -s 'http://%s/dial?request=%s&protocol=%s&host=%s&port=%d&tries=1'", + ipPort, + dialCmd, + protocol, + targetIP, + targetPort) +} + // DialFromContainer executes a curl via kubectl exec in a test container, // which might then translate to a tcp or udp request based on the protocol // argument in the url. @@ -232,38 +250,19 @@ func (config *NetworkingTestConfig) EndpointHostnames() sets.String { // pod and confirm it doesn't show up as an endpoint. func (config *NetworkingTestConfig) DialFromContainer(protocol, dialCommand, containerIP, targetIP string, containerHTTPPort, targetPort, maxTries, minTries int, expectedResponses sets.String) { ipPort := net.JoinHostPort(containerIP, strconv.Itoa(containerHTTPPort)) - // The current versions of curl included in CentOS and RHEL distros - // misinterpret square brackets around IPv6 as globbing, so use the -g - // argument to disable globbing to handle the IPv6 case. - cmd := fmt.Sprintf("curl -g -q -s 'http://%s/dial?request=%s&protocol=%s&host=%s&port=%d&tries=1'", - ipPort, - dialCommand, - protocol, - targetIP, - targetPort) + cmd := makeCURLDialCommand(ipPort, dialCommand, protocol, targetIP, targetPort) responses := sets.NewString() for i := 0; i < maxTries; i++ { - stdout, stderr, err := config.f.ExecShellInPodWithFullOutput(config.TestContainerPod.Name, cmd) + resp, err := config.GetResponseFromContainer(protocol, dialCommand, containerIP, targetIP, containerHTTPPort, targetPort) if err != nil { - // A failure to kubectl exec counts as a try, not a hard fail. - // Also note that we will keep failing for maxTries in tests where - // we confirm unreachability. - framework.Logf("Failed to execute %q: %v, stdout: %q, stderr %q", cmd, err, stdout, stderr) - } else { - var output map[string][]string - if err := json.Unmarshal([]byte(stdout), &output); err != nil { - framework.Logf("WARNING: Failed to unmarshal curl response. Cmd %v run in %v, output: %s, err: %v", - cmd, config.HostTestContainerPod.Name, stdout, err) - continue - } - - for _, response := range output["responses"] { - trimmed := strings.TrimSpace(response) - if trimmed != "" { - responses.Insert(trimmed) - } + continue + } + for _, response := range resp.Responses { + trimmed := strings.TrimSpace(response) + if trimmed != "" { + responses.Insert(trimmed) } } framework.Logf("Waiting for responses: %v", expectedResponses.Difference(responses)) @@ -314,14 +313,14 @@ func (config *NetworkingTestConfig) GetEndpointsFromContainer(protocol, containe framework.Logf("Failed to execute %q: %v, stdout: %q, stderr: %q", cmd, err, stdout, stderr) } else { framework.Logf("Tries: %d, in try: %d, stdout: %v, stderr: %v, command run in: %#v", tries, i, stdout, stderr, config.HostTestContainerPod) - var output map[string][]string + var output NetexecDialResponse if err := json.Unmarshal([]byte(stdout), &output); err != nil { framework.Logf("WARNING: Failed to unmarshal curl response. Cmd %v run in %v, output: %s, err: %v", cmd, config.HostTestContainerPod.Name, stdout, err) continue } - for _, hostName := range output["responses"] { + for _, hostName := range output.Responses { trimmed := strings.TrimSpace(hostName) if trimmed != "" { eps.Insert(trimmed) @@ -334,6 +333,34 @@ func (config *NetworkingTestConfig) GetEndpointsFromContainer(protocol, containe return eps, nil } +// GetResponseFromContainer executes a curl via kubectl exec in a container. +func (config *NetworkingTestConfig) GetResponseFromContainer(protocol, dialCommand, containerIP, targetIP string, containerHTTPPort, targetPort int) (NetexecDialResponse, error) { + ipPort := net.JoinHostPort(containerIP, strconv.Itoa(containerHTTPPort)) + cmd := makeCURLDialCommand(ipPort, dialCommand, protocol, targetIP, targetPort) + + stdout, stderr, err := config.f.ExecShellInPodWithFullOutput(config.TestContainerPod.Name, cmd) + if err != nil { + // A failure to kubectl exec counts as a try, not a hard fail. + // Also note that we will keep failing for maxTries in tests where + // we confirm unreachability. + framework.Logf("Failed to execute %q: %v, stdout: %q, stderr: %q", cmd, err, stdout, stderr) + return NetexecDialResponse{}, err + } + + var output NetexecDialResponse + if err := json.Unmarshal([]byte(stdout), &output); err != nil { + framework.Logf("WARNING: Failed to unmarshal curl response. Cmd %v run in %v, output: %s, err: %v", + cmd, config.HostTestContainerPod.Name, stdout, err) + return NetexecDialResponse{}, err + } + return output, nil +} + +// GetResponseFromTestContainer executes a curl via kubectl exec in a test container. +func (config *NetworkingTestConfig) GetResponseFromTestContainer(protocol, dialCommand, targetIP string, targetPort int) (NetexecDialResponse, error) { + return config.GetResponseFromContainer(protocol, dialCommand, config.TestContainerPod.Status.PodIP, targetIP, testContainerHTTPPort, targetPort) +} + // DialFromNode executes a tcp or udp request based on protocol via kubectl exec // in a test container running with host networking. // - minTries is the minimum number of curl attempts required before declaring diff --git a/test/e2e/framework/service/jig.go b/test/e2e/framework/service/jig.go index affe76af9e7..220dd24b69b 100644 --- a/test/e2e/framework/service/jig.go +++ b/test/e2e/framework/service/jig.go @@ -260,23 +260,38 @@ func (j *TestJig) CreateLoadBalancerService(timeout time.Duration, tweak func(sv // GetEndpointNodes returns a map of nodenames:external-ip on which the // endpoints of the Service are running. func (j *TestJig) GetEndpointNodes() (map[string][]string, error) { - nodes, err := e2enode.GetBoundedReadySchedulableNodes(j.Client, MaxNodesForEndpointsTests) - if err != nil { - return nil, err - } - epNodes, err := j.GetEndpointNodeNames() + nodes, err := j.ListNodesWithEndpoint() if err != nil { return nil, err } nodeMap := map[string][]string{} - for _, n := range nodes.Items { - if epNodes.Has(n.Name) { - nodeMap[n.Name] = e2enode.GetAddresses(&n, v1.NodeExternalIP) - } + for _, node := range nodes { + nodeMap[node.Name] = e2enode.GetAddresses(&node, v1.NodeExternalIP) } return nodeMap, nil } +// ListNodesWithEndpoint returns a list of nodes on which the +// endpoints of the given Service are running. +func (j *TestJig) ListNodesWithEndpoint() ([]v1.Node, error) { + nodeNames, err := j.GetEndpointNodeNames() + if err != nil { + return nil, err + } + ctx := context.TODO() + allNodes, err := j.Client.CoreV1().Nodes().List(ctx, metav1.ListOptions{}) + if err != nil { + return nil, err + } + epNodes := make([]v1.Node, 0, nodeNames.Len()) + for _, node := range allNodes.Items { + if nodeNames.Has(node.Name) { + epNodes = append(epNodes, node) + } + } + return epNodes, nil +} + // GetEndpointNodeNames returns a string set of node names on which the // endpoints of the given Service are running. func (j *TestJig) GetEndpointNodeNames() (sets.String, error) { diff --git a/test/e2e/network/service.go b/test/e2e/network/service.go index 61f3468d89b..301b3edd5ae 100644 --- a/test/e2e/network/service.go +++ b/test/e2e/network/service.go @@ -17,7 +17,6 @@ limitations under the License. package network import ( - "bytes" "context" "encoding/json" "errors" @@ -3283,42 +3282,72 @@ var _ = SIGDescribe("ESIPP [Slow]", func() { framework.Failf("Service HealthCheck NodePort still present") } - endpointNodeMap, err := jig.GetEndpointNodes() + epNodes, err := jig.ListNodesWithEndpoint() framework.ExpectNoError(err) - noEndpointNodeMap := map[string][]string{} - for _, n := range nodes.Items { - if _, ok := endpointNodeMap[n.Name]; ok { - continue + // map from name of nodes with endpoint to internal ip + // it is assumed that there is only a single node with the endpoint + endpointNodeMap := make(map[string]string) + // map from name of nodes without endpoint to internal ip + noEndpointNodeMap := make(map[string]string) + for _, node := range epNodes { + ips := e2enode.GetAddresses(&node, v1.NodeInternalIP) + if len(ips) < 1 { + framework.Failf("No internal ip found for node %s", node.Name) } - noEndpointNodeMap[n.Name] = e2enode.GetAddresses(&n, v1.NodeExternalIP) + endpointNodeMap[node.Name] = ips[0] } + for _, n := range nodes.Items { + ips := e2enode.GetAddresses(&n, v1.NodeInternalIP) + if len(ips) < 1 { + framework.Failf("No internal ip found for node %s", n.Name) + } + if _, ok := endpointNodeMap[n.Name]; !ok { + noEndpointNodeMap[n.Name] = ips[0] + } + } + framework.ExpectNotEqual(len(endpointNodeMap), 0) + framework.ExpectNotEqual(len(noEndpointNodeMap), 0) svcTCPPort := int(svc.Spec.Ports[0].Port) svcNodePort := int(svc.Spec.Ports[0].NodePort) ingressIP := e2eservice.GetIngressPoint(&svc.Status.LoadBalancer.Ingress[0]) path := "/clientip" + dialCmd := "clientip" + + config := e2enetwork.NewNetworkingTestConfig(f, false, false) ginkgo.By(fmt.Sprintf("endpoints present on nodes %v, absent on nodes %v", endpointNodeMap, noEndpointNodeMap)) - for nodeName, nodeIPs := range noEndpointNodeMap { - ginkgo.By(fmt.Sprintf("Checking %v (%v:%v%v) proxies to endpoints on another node", nodeName, nodeIPs[0], svcNodePort, path)) - GetHTTPContent(nodeIPs[0], svcNodePort, e2eservice.KubeProxyLagTimeout, path) + for nodeName, nodeIP := range noEndpointNodeMap { + ginkgo.By(fmt.Sprintf("Checking %v (%v:%v/%v) proxies to endpoints on another node", nodeName, nodeIP[0], svcNodePort, dialCmd)) + _, err := GetHTTPContentFromTestContainer(config, nodeIP, svcNodePort, e2eservice.KubeProxyLagTimeout, dialCmd) + framework.ExpectNoError(err, "Could not reach HTTP service through %v:%v/%v after %v", nodeIP, svcNodePort, dialCmd, e2eservice.KubeProxyLagTimeout) } - for nodeName, nodeIPs := range endpointNodeMap { - ginkgo.By(fmt.Sprintf("checking kube-proxy health check fails on node with endpoint (%s), public IP %s", nodeName, nodeIPs[0])) - var body bytes.Buffer - pollfn := func() (bool, error) { - result := e2enetwork.PokeHTTP(nodeIPs[0], healthCheckNodePort, "/healthz", nil) - if result.Code == 0 { + for nodeName, nodeIP := range endpointNodeMap { + ginkgo.By(fmt.Sprintf("checking kube-proxy health check fails on node with endpoint (%s), public IP %s", nodeName, nodeIP)) + var body string + pollFn := func() (bool, error) { + // we expect connection failure here, but not other errors + + resp, err := config.GetResponseFromTestContainer( + "http", + "healthz", + nodeIP, + healthCheckNodePort) + if err != nil { + return false, nil + } + if len(resp.Errors) > 0 { return true, nil } - body.Reset() - body.Write(result.Body) + if len(resp.Responses) > 0 { + body = resp.Responses[0] + } return false, nil } - if pollErr := wait.PollImmediate(framework.Poll, e2eservice.TestTimeout, pollfn); pollErr != nil { + if pollErr := wait.PollImmediate(framework.Poll, e2eservice.TestTimeout, pollFn); pollErr != nil { framework.Failf("Kube-proxy still exposing health check on node %v:%v, after ESIPP was turned off. body %s", - nodeName, healthCheckNodePort, body.String()) + nodeName, healthCheckNodePort, body) } } diff --git a/test/e2e/network/util.go b/test/e2e/network/util.go index 13acfd6d38b..dfa3bd4dd49 100644 --- a/test/e2e/network/util.go +++ b/test/e2e/network/util.go @@ -53,6 +53,23 @@ func GetHTTPContent(host string, port int, timeout time.Duration, url string) by return body } +// GetHTTPContentFromTestContainer returns the content of the given url by HTTP via a test container. +func GetHTTPContentFromTestContainer(config *e2enetwork.NetworkingTestConfig, host string, port int, timeout time.Duration, dialCmd string) (string, error) { + var body string + pollFn := func() (bool, error) { + resp, err := config.GetResponseFromTestContainer("http", dialCmd, host, port) + if err != nil || len(resp.Errors) > 0 || len(resp.Responses) == 0 { + return false, nil + } + body = resp.Responses[0] + return true, nil + } + if pollErr := wait.PollImmediate(framework.Poll, timeout, pollFn); pollErr != nil { + return "", pollErr + } + return body, nil +} + // DescribeSvc logs the output of kubectl describe svc for the given namespace func DescribeSvc(ns string) { framework.Logf("\nOutput of kubectl describe svc:\n")