fix(e2e): access nodes via test container in LB network tests

Signed-off-by: knight42 <anonymousknight96@gmail.com>
This commit is contained in:
knight42 2020-05-29 12:00:40 +08:00
parent 2446c3d7e9
commit 708fb6b457
No known key found for this signature in database
GPG Key ID: 1040B69865E7D86C
4 changed files with 146 additions and 58 deletions

View File

@ -160,6 +160,12 @@ type NetworkingTestConfig struct {
Namespace string
}
// NetexecDialResponse represents the response returned by the `netexec` subcommand of `agnhost`
type NetexecDialResponse struct {
Responses []string `json:"responses"`
Errors []string `json:"errors"`
}
// DialFromEndpointContainer executes a curl via kubectl exec in an endpoint container.
func (config *NetworkingTestConfig) DialFromEndpointContainer(protocol, targetIP string, targetPort, maxTries, minTries int, expectedEps sets.String) {
config.DialFromContainer(protocol, echoHostname, config.EndpointPods[0].Status.PodIP, targetIP, EndpointHTTPPort, targetPort, maxTries, minTries, expectedEps)
@ -212,6 +218,18 @@ func (config *NetworkingTestConfig) EndpointHostnames() sets.String {
return expectedEps
}
func makeCURLDialCommand(ipPort, dialCmd, protocol, targetIP string, targetPort int) string {
// The current versions of curl included in CentOS and RHEL distros
// misinterpret square brackets around IPv6 as globbing, so use the -g
// argument to disable globbing to handle the IPv6 case.
return fmt.Sprintf("curl -g -q -s 'http://%s/dial?request=%s&protocol=%s&host=%s&port=%d&tries=1'",
ipPort,
dialCmd,
protocol,
targetIP,
targetPort)
}
// DialFromContainer executes a curl via kubectl exec in a test container,
// which might then translate to a tcp or udp request based on the protocol
// argument in the url.
@ -232,38 +250,19 @@ func (config *NetworkingTestConfig) EndpointHostnames() sets.String {
// pod and confirm it doesn't show up as an endpoint.
func (config *NetworkingTestConfig) DialFromContainer(protocol, dialCommand, containerIP, targetIP string, containerHTTPPort, targetPort, maxTries, minTries int, expectedResponses sets.String) {
ipPort := net.JoinHostPort(containerIP, strconv.Itoa(containerHTTPPort))
// The current versions of curl included in CentOS and RHEL distros
// misinterpret square brackets around IPv6 as globbing, so use the -g
// argument to disable globbing to handle the IPv6 case.
cmd := fmt.Sprintf("curl -g -q -s 'http://%s/dial?request=%s&protocol=%s&host=%s&port=%d&tries=1'",
ipPort,
dialCommand,
protocol,
targetIP,
targetPort)
cmd := makeCURLDialCommand(ipPort, dialCommand, protocol, targetIP, targetPort)
responses := sets.NewString()
for i := 0; i < maxTries; i++ {
stdout, stderr, err := config.f.ExecShellInPodWithFullOutput(config.TestContainerPod.Name, cmd)
resp, err := config.GetResponseFromContainer(protocol, dialCommand, containerIP, targetIP, containerHTTPPort, targetPort)
if err != nil {
// A failure to kubectl exec counts as a try, not a hard fail.
// Also note that we will keep failing for maxTries in tests where
// we confirm unreachability.
framework.Logf("Failed to execute %q: %v, stdout: %q, stderr %q", cmd, err, stdout, stderr)
} else {
var output map[string][]string
if err := json.Unmarshal([]byte(stdout), &output); err != nil {
framework.Logf("WARNING: Failed to unmarshal curl response. Cmd %v run in %v, output: %s, err: %v",
cmd, config.HostTestContainerPod.Name, stdout, err)
continue
}
for _, response := range output["responses"] {
trimmed := strings.TrimSpace(response)
if trimmed != "" {
responses.Insert(trimmed)
}
continue
}
for _, response := range resp.Responses {
trimmed := strings.TrimSpace(response)
if trimmed != "" {
responses.Insert(trimmed)
}
}
framework.Logf("Waiting for responses: %v", expectedResponses.Difference(responses))
@ -314,14 +313,14 @@ func (config *NetworkingTestConfig) GetEndpointsFromContainer(protocol, containe
framework.Logf("Failed to execute %q: %v, stdout: %q, stderr: %q", cmd, err, stdout, stderr)
} else {
framework.Logf("Tries: %d, in try: %d, stdout: %v, stderr: %v, command run in: %#v", tries, i, stdout, stderr, config.HostTestContainerPod)
var output map[string][]string
var output NetexecDialResponse
if err := json.Unmarshal([]byte(stdout), &output); err != nil {
framework.Logf("WARNING: Failed to unmarshal curl response. Cmd %v run in %v, output: %s, err: %v",
cmd, config.HostTestContainerPod.Name, stdout, err)
continue
}
for _, hostName := range output["responses"] {
for _, hostName := range output.Responses {
trimmed := strings.TrimSpace(hostName)
if trimmed != "" {
eps.Insert(trimmed)
@ -334,6 +333,34 @@ func (config *NetworkingTestConfig) GetEndpointsFromContainer(protocol, containe
return eps, nil
}
// GetResponseFromContainer executes a curl via kubectl exec in a container.
func (config *NetworkingTestConfig) GetResponseFromContainer(protocol, dialCommand, containerIP, targetIP string, containerHTTPPort, targetPort int) (NetexecDialResponse, error) {
ipPort := net.JoinHostPort(containerIP, strconv.Itoa(containerHTTPPort))
cmd := makeCURLDialCommand(ipPort, dialCommand, protocol, targetIP, targetPort)
stdout, stderr, err := config.f.ExecShellInPodWithFullOutput(config.TestContainerPod.Name, cmd)
if err != nil {
// A failure to kubectl exec counts as a try, not a hard fail.
// Also note that we will keep failing for maxTries in tests where
// we confirm unreachability.
framework.Logf("Failed to execute %q: %v, stdout: %q, stderr: %q", cmd, err, stdout, stderr)
return NetexecDialResponse{}, err
}
var output NetexecDialResponse
if err := json.Unmarshal([]byte(stdout), &output); err != nil {
framework.Logf("WARNING: Failed to unmarshal curl response. Cmd %v run in %v, output: %s, err: %v",
cmd, config.HostTestContainerPod.Name, stdout, err)
return NetexecDialResponse{}, err
}
return output, nil
}
// GetResponseFromTestContainer executes a curl via kubectl exec in a test container.
func (config *NetworkingTestConfig) GetResponseFromTestContainer(protocol, dialCommand, targetIP string, targetPort int) (NetexecDialResponse, error) {
return config.GetResponseFromContainer(protocol, dialCommand, config.TestContainerPod.Status.PodIP, targetIP, testContainerHTTPPort, targetPort)
}
// DialFromNode executes a tcp or udp request based on protocol via kubectl exec
// in a test container running with host networking.
// - minTries is the minimum number of curl attempts required before declaring

View File

@ -260,23 +260,38 @@ func (j *TestJig) CreateLoadBalancerService(timeout time.Duration, tweak func(sv
// GetEndpointNodes returns a map of nodenames:external-ip on which the
// endpoints of the Service are running.
func (j *TestJig) GetEndpointNodes() (map[string][]string, error) {
nodes, err := e2enode.GetBoundedReadySchedulableNodes(j.Client, MaxNodesForEndpointsTests)
if err != nil {
return nil, err
}
epNodes, err := j.GetEndpointNodeNames()
nodes, err := j.ListNodesWithEndpoint()
if err != nil {
return nil, err
}
nodeMap := map[string][]string{}
for _, n := range nodes.Items {
if epNodes.Has(n.Name) {
nodeMap[n.Name] = e2enode.GetAddresses(&n, v1.NodeExternalIP)
}
for _, node := range nodes {
nodeMap[node.Name] = e2enode.GetAddresses(&node, v1.NodeExternalIP)
}
return nodeMap, nil
}
// ListNodesWithEndpoint returns a list of nodes on which the
// endpoints of the given Service are running.
func (j *TestJig) ListNodesWithEndpoint() ([]v1.Node, error) {
nodeNames, err := j.GetEndpointNodeNames()
if err != nil {
return nil, err
}
ctx := context.TODO()
allNodes, err := j.Client.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
if err != nil {
return nil, err
}
epNodes := make([]v1.Node, 0, nodeNames.Len())
for _, node := range allNodes.Items {
if nodeNames.Has(node.Name) {
epNodes = append(epNodes, node)
}
}
return epNodes, nil
}
// GetEndpointNodeNames returns a string set of node names on which the
// endpoints of the given Service are running.
func (j *TestJig) GetEndpointNodeNames() (sets.String, error) {

View File

@ -17,7 +17,6 @@ limitations under the License.
package network
import (
"bytes"
"context"
"encoding/json"
"errors"
@ -3283,42 +3282,72 @@ var _ = SIGDescribe("ESIPP [Slow]", func() {
framework.Failf("Service HealthCheck NodePort still present")
}
endpointNodeMap, err := jig.GetEndpointNodes()
epNodes, err := jig.ListNodesWithEndpoint()
framework.ExpectNoError(err)
noEndpointNodeMap := map[string][]string{}
for _, n := range nodes.Items {
if _, ok := endpointNodeMap[n.Name]; ok {
continue
// map from name of nodes with endpoint to internal ip
// it is assumed that there is only a single node with the endpoint
endpointNodeMap := make(map[string]string)
// map from name of nodes without endpoint to internal ip
noEndpointNodeMap := make(map[string]string)
for _, node := range epNodes {
ips := e2enode.GetAddresses(&node, v1.NodeInternalIP)
if len(ips) < 1 {
framework.Failf("No internal ip found for node %s", node.Name)
}
noEndpointNodeMap[n.Name] = e2enode.GetAddresses(&n, v1.NodeExternalIP)
endpointNodeMap[node.Name] = ips[0]
}
for _, n := range nodes.Items {
ips := e2enode.GetAddresses(&n, v1.NodeInternalIP)
if len(ips) < 1 {
framework.Failf("No internal ip found for node %s", n.Name)
}
if _, ok := endpointNodeMap[n.Name]; !ok {
noEndpointNodeMap[n.Name] = ips[0]
}
}
framework.ExpectNotEqual(len(endpointNodeMap), 0)
framework.ExpectNotEqual(len(noEndpointNodeMap), 0)
svcTCPPort := int(svc.Spec.Ports[0].Port)
svcNodePort := int(svc.Spec.Ports[0].NodePort)
ingressIP := e2eservice.GetIngressPoint(&svc.Status.LoadBalancer.Ingress[0])
path := "/clientip"
dialCmd := "clientip"
config := e2enetwork.NewNetworkingTestConfig(f, false, false)
ginkgo.By(fmt.Sprintf("endpoints present on nodes %v, absent on nodes %v", endpointNodeMap, noEndpointNodeMap))
for nodeName, nodeIPs := range noEndpointNodeMap {
ginkgo.By(fmt.Sprintf("Checking %v (%v:%v%v) proxies to endpoints on another node", nodeName, nodeIPs[0], svcNodePort, path))
GetHTTPContent(nodeIPs[0], svcNodePort, e2eservice.KubeProxyLagTimeout, path)
for nodeName, nodeIP := range noEndpointNodeMap {
ginkgo.By(fmt.Sprintf("Checking %v (%v:%v/%v) proxies to endpoints on another node", nodeName, nodeIP[0], svcNodePort, dialCmd))
_, err := GetHTTPContentFromTestContainer(config, nodeIP, svcNodePort, e2eservice.KubeProxyLagTimeout, dialCmd)
framework.ExpectNoError(err, "Could not reach HTTP service through %v:%v/%v after %v", nodeIP, svcNodePort, dialCmd, e2eservice.KubeProxyLagTimeout)
}
for nodeName, nodeIPs := range endpointNodeMap {
ginkgo.By(fmt.Sprintf("checking kube-proxy health check fails on node with endpoint (%s), public IP %s", nodeName, nodeIPs[0]))
var body bytes.Buffer
pollfn := func() (bool, error) {
result := e2enetwork.PokeHTTP(nodeIPs[0], healthCheckNodePort, "/healthz", nil)
if result.Code == 0 {
for nodeName, nodeIP := range endpointNodeMap {
ginkgo.By(fmt.Sprintf("checking kube-proxy health check fails on node with endpoint (%s), public IP %s", nodeName, nodeIP))
var body string
pollFn := func() (bool, error) {
// we expect connection failure here, but not other errors
resp, err := config.GetResponseFromTestContainer(
"http",
"healthz",
nodeIP,
healthCheckNodePort)
if err != nil {
return false, nil
}
if len(resp.Errors) > 0 {
return true, nil
}
body.Reset()
body.Write(result.Body)
if len(resp.Responses) > 0 {
body = resp.Responses[0]
}
return false, nil
}
if pollErr := wait.PollImmediate(framework.Poll, e2eservice.TestTimeout, pollfn); pollErr != nil {
if pollErr := wait.PollImmediate(framework.Poll, e2eservice.TestTimeout, pollFn); pollErr != nil {
framework.Failf("Kube-proxy still exposing health check on node %v:%v, after ESIPP was turned off. body %s",
nodeName, healthCheckNodePort, body.String())
nodeName, healthCheckNodePort, body)
}
}

View File

@ -53,6 +53,23 @@ func GetHTTPContent(host string, port int, timeout time.Duration, url string) by
return body
}
// GetHTTPContentFromTestContainer returns the content of the given url by HTTP via a test container.
func GetHTTPContentFromTestContainer(config *e2enetwork.NetworkingTestConfig, host string, port int, timeout time.Duration, dialCmd string) (string, error) {
var body string
pollFn := func() (bool, error) {
resp, err := config.GetResponseFromTestContainer("http", dialCmd, host, port)
if err != nil || len(resp.Errors) > 0 || len(resp.Responses) == 0 {
return false, nil
}
body = resp.Responses[0]
return true, nil
}
if pollErr := wait.PollImmediate(framework.Poll, timeout, pollFn); pollErr != nil {
return "", pollErr
}
return body, nil
}
// DescribeSvc logs the output of kubectl describe svc for the given namespace
func DescribeSvc(ns string) {
framework.Logf("\nOutput of kubectl describe svc:\n")