diff --git a/test/e2e/service.go b/test/e2e/service.go index 7d26d5262fa..a193abbfab0 100644 --- a/test/e2e/service.go +++ b/test/e2e/service.go @@ -17,6 +17,7 @@ limitations under the License. package e2e import ( + "bytes" "fmt" "io/ioutil" "math/rand" @@ -31,6 +32,7 @@ import ( . "github.com/onsi/gomega" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/errors" + "k8s.io/kubernetes/pkg/api/service" client "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/controller/endpoint" "k8s.io/kubernetes/pkg/labels" @@ -1013,6 +1015,89 @@ var _ = framework.KubeDescribe("Services", func() { framework.Failf("expected un-ready endpoint for Service %v within %v, stdout: %v", t.name, kubeProxyLagTimeout, stdout) } }) + + It("should be able to create services of type LoadBalancer and externalTraffic=localOnly [Slow][Feature:externalTrafficLocalOnly]", func() { + // requires cloud load-balancer support - this feature currently supported only on GCE/GKE + framework.SkipUnlessProviderIs("gce", "gke") + loadBalancerCreateTimeout := loadBalancerCreateTimeoutDefault + + largeClusterMinNodesNumber := 100 + if nodes := framework.GetReadySchedulableNodesOrDie(c); len(nodes.Items) > largeClusterMinNodesNumber { + loadBalancerCreateTimeout = loadBalancerCreateTimeoutLarge + } + namespace := f.Namespace.Name + serviceName := "external-local" + jig := NewServiceTestJig(c, serviceName) + By("creating a service " + namespace + "/" + namespace + " with type=LoadBalancer and annotation for local-traffic-only") + svc := jig.CreateTCPServiceOrFail(namespace, func(svc *api.Service) { + svc.Spec.Type = api.ServiceTypeLoadBalancer + // We need to turn affinity off for our LB distribution tests + svc.Spec.SessionAffinity = api.ServiceAffinityNone + svc.ObjectMeta.Annotations = map[string]string{ + service.AnnotationExternalTraffic: service.AnnotationValueExternalTrafficLocal} + svc.Spec.Ports = []api.ServicePort{{Protocol: "TCP", Port: 80}} + }) + By("creating a pod to be part of the service " + serviceName) + // This container is an nginx container listening on port 80 + // See kubernetes/contrib/ingress/echoheaders/nginx.conf for content of response + jig.RunOrFail(namespace, nil) + By("waiting for loadbalancer for service " + namespace + "/" + serviceName) + svc = jig.WaitForLoadBalancerOrFail(namespace, serviceName, loadBalancerCreateTimeout) + jig.SanityCheckService(svc, api.ServiceTypeLoadBalancer) + svcTcpPort := int(svc.Spec.Ports[0].Port) + framework.Logf("service port : %d", svcTcpPort) + tcpNodePort := int(svc.Spec.Ports[0].NodePort) + framework.Logf("TCP node port: %d", tcpNodePort) + ingressIP := getIngressPoint(&svc.Status.LoadBalancer.Ingress[0]) + framework.Logf("TCP load balancer: %s", ingressIP) + healthCheckNodePort := int(service.GetServiceHealthCheckNodePort(svc)) + By("checking health check node port allocated") + if healthCheckNodePort == 0 { + framework.Failf("Service HealthCheck NodePort was not allocated") + } + nodeIP := pickNodeIP(jig.Client) + By("hitting the TCP service's NodePort on " + nodeIP + ":" + fmt.Sprintf("%d", tcpNodePort)) + jig.TestReachableHTTP(nodeIP, tcpNodePort, kubeProxyLagTimeout) + By("hitting the TCP service's service port, via its external VIP " + ingressIP + ":" + fmt.Sprintf("%d", svcTcpPort)) + jig.TestReachableHTTP(ingressIP, svcTcpPort, kubeProxyLagTimeout) + By("reading clientIP using the TCP service's NodePort") + content := jig.GetHTTPContent(nodeIP, tcpNodePort, kubeProxyLagTimeout, "/clientip") + clientIP := content.String() + framework.Logf("ClientIP detected by target pod using NodePort is %s", clientIP) + By("reading clientIP using the TCP service's service port via its external VIP") + content = jig.GetHTTPContent(ingressIP, svcTcpPort, kubeProxyLagTimeout, "/clientip") + clientIP = content.String() + framework.Logf("ClientIP detected by target pod using VIP:SvcPort is %s", clientIP) + By("checking if Source IP is preserved") + if strings.HasPrefix(clientIP, "10.") { + framework.Failf("Source IP was NOT preserved") + } + By("finding nodes for all service endpoints") + endpoints, err := c.Endpoints(namespace).Get(serviceName) + if err != nil { + framework.Failf("Get endpoints for service %s/%s failed (%s)", namespace, serviceName, err) + } + if len(endpoints.Subsets[0].Addresses) == 0 { + framework.Failf("Expected Ready endpoints - found none") + } + readyHostName := *endpoints.Subsets[0].Addresses[0].NodeName + framework.Logf("Pod for service %s/%s is on node %s", namespace, serviceName, readyHostName) + // HealthCheck responder validation - iterate over all node IPs and check their HC responses + // Collect all node names and their public IPs - the nodes and ips slices parallel each other + nodes := framework.GetReadySchedulableNodesOrDie(jig.Client) + ips := collectAddresses(nodes, api.NodeExternalIP) + if len(ips) == 0 { + ips = collectAddresses(nodes, api.NodeLegacyHostIP) + } + By("checking kube-proxy health check responses are correct") + for n, publicIP := range ips { + framework.Logf("Checking health check response for node %s, public IP %s", nodes.Items[n].Name, publicIP) + // HealthCheck should pass only on the node where num(endpoints) > 0 + // All other nodes should fail the healthcheck on the service healthCheckNodePort + expectedSuccess := nodes.Items[n].Name == readyHostName + jig.TestHTTPHealthCheckNodePort(publicIP, healthCheckNodePort, "/healthz", expectedSuccess) + } + }) }) // updateService fetches a service, calls the update function on it, @@ -1247,6 +1332,10 @@ func pickNodeIP(c *client.Client) string { } func testReachableHTTP(ip string, port int, request string, expect string) (bool, error) { + return testReachableHTTPWithContent(ip, port, request, expect, nil) +} + +func testReachableHTTPWithContent(ip string, port int, request string, expect string, content *bytes.Buffer) (bool, error) { url := fmt.Sprintf("http://%s:%d%s", ip, port, request) if ip == "" { framework.Failf("Got empty IP for reachability check (%s)", url) @@ -1271,15 +1360,46 @@ func testReachableHTTP(ip string, port int, request string, expect string) (bool return false, nil } if resp.StatusCode != 200 { - return false, fmt.Errorf("received non-success return status %q trying to access %s; got body: %s", resp.Status, url, string(body)) + return false, fmt.Errorf("received non-success return status %q trying to access %s; got body: %s", + resp.Status, url, string(body)) } if !strings.Contains(string(body), expect) { return false, fmt.Errorf("received response body without expected substring %q: %s", expect, string(body)) } - framework.Logf("Successfully reached %v", url) + if content != nil { + content.Write(body) + } return true, nil } +func testHTTPHealthCheckNodePort(ip string, port int, request string) (bool, error) { + url := fmt.Sprintf("http://%s:%d%s", ip, port, request) + if ip == "" || port == 0 { + framework.Failf("Got empty IP for reachability check (%s)", url) + return false, fmt.Errorf("Invalid input ip or port") + } + framework.Logf("Testing HTTP health check on %v", url) + resp, err := httpGetNoConnectionPool(url) + if err != nil { + framework.Logf("Got error testing for reachability of %s: %v", url, err) + return false, err + } + defer resp.Body.Close() + if err != nil { + framework.Logf("Got error reading response from %s: %v", url, err) + return false, err + } + // HealthCheck responder returns 503 for no local endpoints + if resp.StatusCode == 503 { + return false, nil + } + // HealthCheck responder returns 200 for non-zero local endpoints + if resp.StatusCode == 200 { + return true, nil + } + return false, fmt.Errorf("Unexpected HTTP response code %s from health check responder at %s", resp.Status, url) +} + func testNotReachableHTTP(ip string, port int) (bool, error) { url := fmt.Sprintf("http://%s:%d", ip, port) if ip == "" { @@ -1806,6 +1926,29 @@ func (j *ServiceTestJig) TestNotReachableUDP(host string, port int, timeout time } } +func (j *ServiceTestJig) GetHTTPContent(host string, port int, timeout time.Duration, url string) bytes.Buffer { + var body bytes.Buffer + if err := wait.PollImmediate(framework.Poll, timeout, func() (bool, error) { return testReachableHTTPWithContent(host, port, url, "", &body) }); err != nil { + framework.Failf("Could not reach HTTP service through %v:%v/%v after %v: %v", host, port, url, timeout, err) + return body + } + return body +} + +func (j *ServiceTestJig) TestHTTPHealthCheckNodePort(host string, port int, request string, expectedSuccess bool) { + success, err := testHTTPHealthCheckNodePort(host, port, request) + if expectedSuccess && success { + framework.Logf("HealthCheck successful for node %v:%v, as expected", host, port) + return + } else if !expectedSuccess && (!success || err != nil) { + framework.Logf("HealthCheck failed for node %v:%v, as expected", host, port) + return + } else if expectedSuccess { + framework.Failf("HealthCheck NodePort incorrectly reporting unhealthy on %v:%v: %v", host, port, err) + } + framework.Failf("Unexpected HealthCheck NodePort still reporting healthy %v:%v: %v", host, port, err) +} + func getIngressPoint(ing *api.LoadBalancerIngress) string { host := ing.IP if host == "" { @@ -1835,7 +1978,7 @@ func (j *ServiceTestJig) newRCTemplate(namespace string) *api.ReplicationControl Containers: []api.Container{ { Name: "netexec", - Image: "gcr.io/google_containers/netexec:1.4", + Image: "gcr.io/google_containers/netexec:1.6", Args: []string{"--http-port=80", "--udp-port=80"}, ReadinessProbe: &api.Probe{ PeriodSeconds: 3,