From b7782e73b648c8d866287ff5cc19a6efd1164170 Mon Sep 17 00:00:00 2001 From: Tim Hockin Date: Fri, 29 Jan 2016 20:35:32 -0800 Subject: [PATCH] Service e2e cleanup Make a new jig that is based on the netexec container. Change the LB tests to use this new jig and leave TODOs for other tests. Add UDP testing to the main mutability test. Flatten the "identical names" test into the mutability test - it is now the only load-balancer test (speedup). Create LBs in parallel. --- test/e2e/cluster_upgrade.go | 54 ++ test/e2e/service.go | 1224 ++++++++++++++++++++--------------- test/e2e/util.go | 6 +- 3 files changed, 744 insertions(+), 540 deletions(-) diff --git a/test/e2e/cluster_upgrade.go b/test/e2e/cluster_upgrade.go index 27aff06490c..3c1bc14f208 100644 --- a/test/e2e/cluster_upgrade.go +++ b/test/e2e/cluster_upgrade.go @@ -610,3 +610,57 @@ func migRollingUpdatePoll(id string, nt time.Duration) error { Logf("MIG rolling update complete after %v", time.Since(start)) return nil } + +func testLoadBalancerReachable(ingress api.LoadBalancerIngress, port int) bool { + return testLoadBalancerReachableInTime(ingress, port, loadBalancerLagTimeout) +} + +func testLoadBalancerReachableInTime(ingress api.LoadBalancerIngress, port int, timeout time.Duration) bool { + ip := ingress.IP + if ip == "" { + ip = ingress.Hostname + } + + return testReachableInTime(conditionFuncDecorator(ip, port, testReachableHTTP, "/", "test-webserver"), timeout) + +} + +func conditionFuncDecorator(ip string, port int, fn func(string, int, string, string) (bool, error), request string, expect string) wait.ConditionFunc { + return func() (bool, error) { + return fn(ip, port, request, expect) + } +} + +func testReachableInTime(testFunc wait.ConditionFunc, timeout time.Duration) bool { + By(fmt.Sprintf("Waiting up to %v", timeout)) + err := wait.PollImmediate(poll, timeout, testFunc) + if err != nil { + Expect(err).NotTo(HaveOccurred(), "Error waiting") + return false + } + return true +} + +func waitForLoadBalancerIngress(c *client.Client, serviceName, namespace string) (*api.Service, error) { + // TODO: once support ticket 21807001 is resolved, reduce this timeout + // back to something reasonable + const timeout = 20 * time.Minute + var service *api.Service + By(fmt.Sprintf("waiting up to %v for service %s in namespace %s to have a LoadBalancer ingress point", timeout, serviceName, namespace)) + i := 1 + for start := time.Now(); time.Since(start) < timeout; time.Sleep(3 * time.Second) { + service, err := c.Services(namespace).Get(serviceName) + if err != nil { + Logf("Get service failed, ignoring for 5s: %v", err) + continue + } + if len(service.Status.LoadBalancer.Ingress) > 0 { + return service, nil + } + if i%5 == 0 { + Logf("Waiting for service %s in namespace %s to have a LoadBalancer ingress point (%v)", serviceName, namespace, time.Since(start)) + } + i++ + } + return service, fmt.Errorf("service %s in namespace %s doesn't have a LoadBalancer ingress point after %.2f seconds", serviceName, namespace, timeout.Seconds()) +} diff --git a/test/e2e/service.go b/test/e2e/service.go index d63e80da319..7ebf3d3f96b 100644 --- a/test/e2e/service.go +++ b/test/e2e/service.go @@ -32,6 +32,7 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/errors" client "k8s.io/kubernetes/pkg/client/unversioned" + "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/intstr" @@ -44,6 +45,13 @@ import ( // notice a Service update, such as type=NodePort. const kubeProxyLagTimeout = 45 * time.Second +// Maximum time a load balancer is allowed to not respond after creation. +const loadBalancerLagTimeout = 2 * time.Minute + +// How long to wait for a load balancer to be created/modified. +//TODO: once support ticket 21807001 is resolved, reduce this timeout back to something reasonable +const loadBalancerCreateTimeout = 20 * time.Minute + // This should match whatever the default/configured range is var ServiceNodePortRange = utilnet.PortRange{Base: 30000, Size: 2768} @@ -81,6 +89,7 @@ var _ = Describe("Services", func() { }) It("should serve a basic endpoint from pods [Conformance]", func() { + // TODO: use the ServiceTestJig here serviceName := "endpoint-test2" ns := f.Namespace.Name labels := map[string]string{ @@ -140,6 +149,7 @@ var _ = Describe("Services", func() { }) It("should serve multiport endpoints from pods [Conformance]", func() { + // TODO: use the ServiceTestJig here // repacking functionality is intentionally not tested here - it's better to test it in an integration test. serviceName := "multi-endpoint-test" ns := f.Namespace.Name @@ -223,6 +233,7 @@ var _ = Describe("Services", func() { }) It("should be able to up and down services", func() { + // TODO: use the ServiceTestJig here // this test uses NodeSSHHosts that does not work if a Node only reports LegacyHostIP SkipUnlessProviderIs(providersWithSSH...) ns := f.Namespace.Name @@ -265,6 +276,7 @@ var _ = Describe("Services", func() { }) It("should work after restarting kube-proxy [Disruptive]", func() { + // TODO: use the ServiceTestJig here SkipUnlessProviderIs("gce", "gke") ns := f.Namespace.Name @@ -316,6 +328,7 @@ var _ = Describe("Services", func() { }) It("should work after restarting apiserver [Disruptive]", func() { + // TODO: use the ServiceTestJig here // TODO: restartApiserver doesn't work in GKE - fix it and reenable this test. SkipUnlessProviderIs("gce") @@ -360,52 +373,24 @@ var _ = Describe("Services", func() { // configured with a default deny firewall to validate that the // proxy whitelists NodePort traffic. It("should be able to create a functioning NodePort service", func() { - serviceName := "nodeportservice-test" + serviceName := "nodeport-test" ns := f.Namespace.Name - t := NewServerTest(c, ns, serviceName) - defer func() { - defer GinkgoRecover() - errs := t.Cleanup() - if len(errs) != 0 { - Failf("errors in cleanup: %v", errs) - } - }() - - service := t.BuildServiceSpec() - service.Spec.Type = api.ServiceTypeNodePort + jig := NewServiceTestJig(c, serviceName) + nodeIP := pickNodeIP(jig.Client) // for later By("creating service " + serviceName + " with type=NodePort in namespace " + ns) - result, err := c.Services(ns).Create(service) - Expect(err).NotTo(HaveOccurred()) - defer func(ns, serviceName string) { // clean up when we're done - By("deleting service " + serviceName + " in namespace " + ns) - err := c.Services(ns).Delete(serviceName) - Expect(err).NotTo(HaveOccurred()) - }(ns, serviceName) - - if len(result.Spec.Ports) != 1 { - Failf("got unexpected number (%d) of Ports for NodePort service: %v", len(result.Spec.Ports), result) - } - - nodePort := result.Spec.Ports[0].NodePort - if nodePort == 0 { - Failf("got unexpected nodePort (%d) on Ports[0] for NodePort service: %v", nodePort, result) - } - if !ServiceNodePortRange.Contains(nodePort) { - Failf("got unexpected (out-of-range) port for NodePort service: %v", result) - } + service := jig.CreateTCPServiceOrFail(ns, func(svc *api.Service) { + svc.Spec.Type = api.ServiceTypeNodePort + }) + jig.SanityCheckService(service, api.ServiceTypeNodePort) + nodePort := service.Spec.Ports[0].NodePort By("creating pod to be part of service " + serviceName) - t.CreateWebserverRC(1) + jig.RunOrFail(ns, nil) By("hitting the pod through the service's NodePort") - ip := pickNodeIP(c) - // Loop for kubeProxyLagTimeout, because different kube-proxies might take - // different times to notice the new Service and open up the node port. - if err := wait.PollImmediate(poll, kubeProxyLagTimeout, func() (bool, error) { return testReachable(ip, nodePort) }); err != nil { - Failf("Could not reach nodePort service through node-ip %v:%v in %v", ip, nodePort, kubeProxyLagTimeout) - } + jig.TestReachableHTTP(nodeIP, nodePort, kubeProxyLagTimeout) By("verifying the node port is locked") hostExec := LaunchHostExecPod(f.Client, f.Namespace.Name, "hostexec") @@ -414,202 +399,274 @@ var _ = Describe("Services", func() { cmd := fmt.Sprintf(`for i in $(seq 1 300); do if ss -ant46 'sport = :%d' | grep ^LISTEN; then exit 0; fi; sleep 1; done; exit 1`, nodePort) stdout, err := RunHostCmd(hostExec.Namespace, hostExec.Name, cmd) if err != nil { - Failf("expected node port (%d) to be in use, stdout: %v", nodePort, stdout) + Failf("expected node port %d to be in use, stdout: %v", nodePort, stdout) } }) - It("should be able to change the type and nodeport settings of a service", func() { + It("should be able to change the type and ports of a service", func() { // requires cloud load-balancer support SkipUnlessProviderIs("gce", "gke", "aws") - serviceName := "mutability-service-test" + // This test is more monolithic than we'd like because LB turnup can be + // very slow, so we lumped all the tests into one LB lifecycle. - t := NewServerTest(f.Client, f.Namespace.Name, serviceName) - defer func() { - defer GinkgoRecover() - errs := t.Cleanup() - if len(errs) != 0 { - Failf("errors in cleanup: %v", errs) - } - }() + serviceName := "mutability-test" + ns1 := f.Namespace.Name // LB1 in ns1 on TCP + Logf("namespace for TCP test: %s", ns1) - service := t.BuildServiceSpec() - - By("creating service " + serviceName + " with type unspecified in namespace " + t.Namespace) - service, err := t.CreateService(service) + By("creating a second namespace") + namespacePtr, err := createTestingNS("services", c, nil) Expect(err).NotTo(HaveOccurred()) + ns2 := namespacePtr.Name // LB2 in ns2 on UDP + Logf("namespace for UDP test: %s", ns2) + extraNamespaces = append(extraNamespaces, ns2) - if service.Spec.Type != api.ServiceTypeClusterIP { - Failf("got unexpected Spec.Type for default service: %v", service) - } - if len(service.Spec.Ports) != 1 { - Failf("got unexpected len(Spec.Ports) for default service: %v", service) - } - port := service.Spec.Ports[0] - if port.NodePort != 0 { - Failf("got unexpected Spec.Ports[0].nodePort for default service: %v", service) - } - if len(service.Status.LoadBalancer.Ingress) != 0 { - Failf("got unexpected len(Status.LoadBalancer.Ingress) for default service: %v", service) - } + jig := NewServiceTestJig(c, serviceName) + nodeIP := pickNodeIP(jig.Client) // for later - By("creating pod to be part of service " + t.ServiceName) - t.CreateWebserverRC(1) + // Test TCP and UDP Services. Services with the same name in different + // namespaces should get different node ports and load balancers. - By("changing service " + serviceName + " to type=NodePort") - service, err = updateService(f.Client, f.Namespace.Name, serviceName, func(s *api.Service) { + By("creating a TCP service " + serviceName + " with type=ClusterIP in namespace " + ns1) + tcpService := jig.CreateTCPServiceOrFail(ns1, nil) + jig.SanityCheckService(tcpService, api.ServiceTypeClusterIP) + + By("creating a UDP service " + serviceName + " with type=ClusterIP in namespace " + ns2) + udpService := jig.CreateUDPServiceOrFail(ns2, nil) + jig.SanityCheckService(udpService, api.ServiceTypeClusterIP) + + By("verifying that TCP and UDP use the same port") + if tcpService.Spec.Ports[0].Port != udpService.Spec.Ports[0].Port { + Failf("expected to use the same port for TCP and UDP") + } + svcPort := tcpService.Spec.Ports[0].Port + Logf("service port (TCP and UDP): %d", svcPort) + + By("creating a pod to be part of the TCP service " + serviceName) + jig.RunOrFail(ns1, nil) + + By("creating a pod to be part of the UDP service " + serviceName) + jig.RunOrFail(ns2, nil) + + // Change the services to NodePort. + + By("changing the TCP service " + serviceName + " to type=NodePort") + tcpService = jig.UpdateServiceOrFail(ns1, tcpService.Name, func(s *api.Service) { s.Spec.Type = api.ServiceTypeNodePort }) - Expect(err).NotTo(HaveOccurred()) + jig.SanityCheckService(tcpService, api.ServiceTypeNodePort) + tcpNodePort := tcpService.Spec.Ports[0].NodePort + Logf("TCP node port: %d", tcpNodePort) - if service.Spec.Type != api.ServiceTypeNodePort { - Failf("got unexpected Spec.Type for NodePort service: %v", service) - } - if len(service.Spec.Ports) != 1 { - Failf("got unexpected len(Spec.Ports) for NodePort service: %v", service) - } - port = service.Spec.Ports[0] - if port.NodePort == 0 { - Failf("got unexpected Spec.Ports[0].nodePort for NodePort service: %v", service) - } - if !ServiceNodePortRange.Contains(port.NodePort) { - Failf("got unexpected (out-of-range) port for NodePort service: %v", service) - } - if len(service.Status.LoadBalancer.Ingress) != 0 { - Failf("got unexpected len(Status.LoadBalancer.Ingress) for NodePort service: %v", service) + By("changing the UDP service " + serviceName + " to type=NodePort") + udpService = jig.UpdateServiceOrFail(ns2, udpService.Name, func(s *api.Service) { + s.Spec.Type = api.ServiceTypeNodePort + }) + jig.SanityCheckService(udpService, api.ServiceTypeNodePort) + udpNodePort := udpService.Spec.Ports[0].NodePort + Logf("UDP node port: %d", udpNodePort) + + By("hitting the TCP service's NodePort") + jig.TestReachableHTTP(nodeIP, tcpNodePort, kubeProxyLagTimeout) + + By("hitting the UDP service's NodePort") + jig.TestReachableUDP(nodeIP, udpNodePort, kubeProxyLagTimeout) + + // Change the services to LoadBalancer. + + requestedIP := "" + if providerIs("gce", "gke") { + By("creating a static load balancer IP") + rand.Seed(time.Now().UTC().UnixNano()) + staticIPName := fmt.Sprintf("e2e-external-lb-test-%d", rand.Intn(65535)) + requestedIP, err = createGCEStaticIP(staticIPName) + Expect(err).NotTo(HaveOccurred()) + defer func() { + // Release GCE static IP - this is not kube-managed and will not be automatically released. + deleteGCEStaticIP(staticIPName) + }() + Logf("Allocated static load balancer IP: %s", requestedIP) } - By("hitting the pod through the service's NodePort") - ip := pickNodeIP(f.Client) - nodePort1 := port.NodePort // Save for later! - - // Loop for kubeProxyLagTimeout, because different kube-proxies might take - // different times to notice the new Service and open up the node port. - if err := wait.PollImmediate(poll, kubeProxyLagTimeout, func() (bool, error) { return testReachable(ip, nodePort1) }); err != nil { - Failf("Could not reach nodePort service through node-ip %v:%v in %v", ip, nodePort1, kubeProxyLagTimeout) - } - - By("changing service " + serviceName + " to type=LoadBalancer") - service, err = updateService(f.Client, f.Namespace.Name, serviceName, func(s *api.Service) { + By("changing the TCP service " + serviceName + " to type=LoadBalancer") + tcpService = jig.UpdateServiceOrFail(ns1, tcpService.Name, func(s *api.Service) { + s.Spec.LoadBalancerIP = requestedIP // will be "" if not applicable s.Spec.Type = api.ServiceTypeLoadBalancer }) - Expect(err).NotTo(HaveOccurred()) - // Wait for the load balancer to be created asynchronously - service, err = waitForLoadBalancerIngress(f.Client, serviceName, f.Namespace.Name) - Expect(err).NotTo(HaveOccurred()) - - if service.Spec.Type != api.ServiceTypeLoadBalancer { - Failf("got unexpected Spec.Type for LoadBalancer service: %v", service) - } - if len(service.Spec.Ports) != 1 { - Failf("got unexpected len(Spec.Ports) for LoadBalancer service: %v", service) - } - port = service.Spec.Ports[0] - if port.NodePort != nodePort1 { - Failf("got unexpected Spec.Ports[0].nodePort for LoadBalancer service: %v", service) - } - if len(service.Status.LoadBalancer.Ingress) != 1 { - Failf("got unexpected len(Status.LoadBalancer.Ingress) for LoadBalancer service: %v", service) - } - ingress1 := service.Status.LoadBalancer.Ingress[0] - if ingress1.IP == "" && ingress1.Hostname == "" { - Failf("got unexpected Status.LoadBalancer.Ingress[0] for LoadBalancer service: %v", service) - } - - By("hitting the pod through the service's NodePort") - ip = pickNodeIP(f.Client) - - // Loop for kubeProxyLagTimeout, because different kube-proxies might take - // different times to notice the new Service and open up the node port. - if err := wait.PollImmediate(poll, kubeProxyLagTimeout, func() (bool, error) { return testReachable(ip, nodePort1) }); err != nil { - Failf("Could not reach nodePort service through node-ip %v:%v in %v", ip, nodePort1, kubeProxyLagTimeout) - } - - By("hitting the pod through the service's LoadBalancer") - testLoadBalancerReachable(ingress1, 80) - - By("changing service " + serviceName + ": update NodePort") - nodePort2 := 0 - for i := 1; i < ServiceNodePortRange.Size; i++ { - offs1 := nodePort1 - ServiceNodePortRange.Base - offs2 := (offs1 + i) % ServiceNodePortRange.Size - nodePort2 = ServiceNodePortRange.Base + offs2 - service, err = updateService(f.Client, f.Namespace.Name, serviceName, func(s *api.Service) { - s.Spec.Ports[0].NodePort = nodePort2 - }) - if err != nil && strings.Contains(err.Error(), "provided port is already allocated") { - Logf("nodePort %d is busy, will retry", nodePort2) - continue - } - // Otherwise err was nil or err was a real error - break - } - Expect(err).NotTo(HaveOccurred()) - - if service.Spec.Type != api.ServiceTypeLoadBalancer { - Failf("got unexpected Spec.Type for updated-LoadBalancer service: %v", service) - } - if len(service.Spec.Ports) != 1 { - Failf("got unexpected len(Spec.Ports) for updated-LoadBalancer service: %v", service) - } - port = service.Spec.Ports[0] - if port.NodePort != nodePort2 { - Failf("got unexpected Spec.Ports[0].nodePort for LoadBalancer service: %v", service) - } - if len(service.Status.LoadBalancer.Ingress) != 1 { - Failf("got unexpected len(Status.LoadBalancer.Ingress) for LoadBalancer service: %v", service) - } - - By("hitting the pod through the service's updated NodePort") - - // Loop for kubeProxyLagTimeout, because different kube-proxies might take - // different times to notice the new Service and open up the node port. - if err := wait.PollImmediate(poll, kubeProxyLagTimeout, func() (bool, error) { return testReachable(ip, nodePort2) }); err != nil { - Failf("Could not reach nodePort service through node-ip %v:%v in %v", ip, nodePort2, kubeProxyLagTimeout) - } - - By("checking the old NodePort is closed") - testNotReachable(ip, nodePort1) - - servicePort := 80 - By("hitting the pod through the service's LoadBalancer") - i := 1 - for start := time.Now(); time.Since(start) < podStartTimeout; time.Sleep(3 * time.Second) { - service, err = waitForLoadBalancerIngress(f.Client, serviceName, f.Namespace.Name) - Expect(err).NotTo(HaveOccurred()) - - ingress2 := service.Status.LoadBalancer.Ingress[0] - if testLoadBalancerReachable(ingress2, servicePort) { - break - } - - if i%5 == 0 { - Logf("Waiting for load-balancer changes (%v elapsed, will retry)", time.Since(start)) - } - i++ - } - - By("updating service's port " + serviceName + " and reaching it at the same IP") - service, err = updateService(f.Client, f.Namespace.Name, serviceName, func(s *api.Service) { - s.Spec.Ports[0].Port = 19482 // chosen arbitrarily to not conflict with port 80 + By("changing the UDP service " + serviceName + " to type=LoadBalancer") + udpService = jig.UpdateServiceOrFail(ns2, udpService.Name, func(s *api.Service) { + s.Spec.Type = api.ServiceTypeLoadBalancer }) - Expect(err).NotTo(HaveOccurred()) - port = service.Spec.Ports[0] - if !testLoadBalancerReachable(service.Status.LoadBalancer.Ingress[0], port.Port) { - Failf("Failed to reach load balancer at original ingress after updating its port: %+v", service) + + By("waiting for the TCP service " + serviceName + " to have a load balancer") + // Wait for the load balancer to be created asynchronously + tcpService = jig.WaitForLoadBalancerOrFail(ns1, tcpService.Name) + jig.SanityCheckService(tcpService, api.ServiceTypeLoadBalancer) + if tcpService.Spec.Ports[0].NodePort != tcpNodePort { + Failf("TCP Spec.Ports[0].NodePort changed (%d -> %d) when not expected", tcpNodePort, tcpService.Spec.Ports[0].NodePort) + } + if requestedIP != "" && getIngressPoint(&tcpService.Status.LoadBalancer.Ingress[0]) != requestedIP { + Failf("unexpected TCP Status.LoadBalancer.Ingress (expected %s, got %s)", requestedIP, getIngressPoint(&tcpService.Status.LoadBalancer.Ingress[0])) + } + tcpIngressIP := getIngressPoint(&tcpService.Status.LoadBalancer.Ingress[0]) + Logf("TCP load balancer: %s", tcpIngressIP) + + By("waiting for the UDP service " + serviceName + " to have a load balancer") + // 2nd one should be faster since they ran in parallel. + udpService = jig.WaitForLoadBalancerOrFail(ns2, udpService.Name) + jig.SanityCheckService(udpService, api.ServiceTypeLoadBalancer) + if udpService.Spec.Ports[0].NodePort != udpNodePort { + Failf("UDP Spec.Ports[0].NodePort changed (%d -> %d) when not expected", udpNodePort, udpService.Spec.Ports[0].NodePort) + } + udpIngressIP := getIngressPoint(&udpService.Status.LoadBalancer.Ingress[0]) + Logf("UDP load balancer: %s", tcpIngressIP) + + By("verifying that TCP and UDP use different load balancers") + if tcpIngressIP == udpIngressIP { + Failf("Load balancers are not different: %s", getIngressPoint(&tcpService.Status.LoadBalancer.Ingress[0])) } - removeExternalLoadBalancer(f, service) + By("hitting the TCP service's NodePort") + jig.TestReachableHTTP(nodeIP, tcpNodePort, kubeProxyLagTimeout) - By("checking the NodePort is closed") - ip = pickNodeIP(f.Client) - testNotReachable(ip, nodePort2) - By("checking the LoadBalancer is closed") - testLoadBalancerNotReachable(ingress1, port.Port) + By("hitting the UDP service's NodePort") + jig.TestReachableUDP(nodeIP, udpNodePort, kubeProxyLagTimeout) + + By("hitting the TCP service's LoadBalancer") + jig.TestReachableHTTP(tcpIngressIP, svcPort, loadBalancerLagTimeout) + + By("hitting the UDP service's LoadBalancer") + jig.TestReachableUDP(udpIngressIP, svcPort, loadBalancerLagTimeout) + + // Change the services' node ports. + + By("changing the TCP service's " + serviceName + " NodePort") + tcpService = jig.ChangeServiceNodePortOrFail(ns1, tcpService.Name, tcpNodePort) + jig.SanityCheckService(tcpService, api.ServiceTypeLoadBalancer) + tcpNodePortOld := tcpNodePort + tcpNodePort = tcpService.Spec.Ports[0].NodePort + if tcpNodePort == tcpNodePortOld { + Failf("TCP Spec.Ports[0].NodePort (%d) did not change", tcpNodePort) + } + if getIngressPoint(&tcpService.Status.LoadBalancer.Ingress[0]) != tcpIngressIP { + Failf("TCP Status.LoadBalancer.Ingress changed (%s -> %s) when not expected", tcpIngressIP, getIngressPoint(&tcpService.Status.LoadBalancer.Ingress[0])) + } + Logf("TCP node port: %d", tcpNodePort) + + By("changing the UDP service's " + serviceName + " NodePort") + udpService = jig.ChangeServiceNodePortOrFail(ns2, udpService.Name, udpNodePort) + jig.SanityCheckService(udpService, api.ServiceTypeLoadBalancer) + udpNodePortOld := udpNodePort + udpNodePort = udpService.Spec.Ports[0].NodePort + if udpNodePort == udpNodePortOld { + Failf("UDP Spec.Ports[0].NodePort (%d) did not change", udpNodePort) + } + if getIngressPoint(&udpService.Status.LoadBalancer.Ingress[0]) != udpIngressIP { + Failf("UDP Status.LoadBalancer.Ingress changed (%s -> %s) when not expected", udpIngressIP, getIngressPoint(&udpService.Status.LoadBalancer.Ingress[0])) + } + Logf("UDP node port: %d", udpNodePort) + + By("hitting the TCP service's new NodePort") + jig.TestReachableHTTP(nodeIP, tcpNodePort, kubeProxyLagTimeout) + + By("hitting the UDP service's new NodePort") + jig.TestReachableUDP(nodeIP, udpNodePort, kubeProxyLagTimeout) + + By("checking the old TCP NodePort is closed") + jig.TestNotReachableHTTP(nodeIP, tcpNodePortOld, kubeProxyLagTimeout) + + By("checking the old UDP NodePort is closed") + jig.TestNotReachableUDP(nodeIP, udpNodePortOld, kubeProxyLagTimeout) + + By("hitting the TCP service's LoadBalancer") + jig.TestReachableHTTP(tcpIngressIP, svcPort, loadBalancerLagTimeout) + + By("hitting the UDP service's LoadBalancer") + jig.TestReachableUDP(udpIngressIP, svcPort, loadBalancerLagTimeout) + + // Change the services' main ports. + + By("changing the TCP service's port") + tcpService = jig.UpdateServiceOrFail(ns1, tcpService.Name, func(s *api.Service) { + s.Spec.Ports[0].Port++ + }) + jig.SanityCheckService(tcpService, api.ServiceTypeLoadBalancer) + svcPortOld := svcPort + svcPort = tcpService.Spec.Ports[0].Port + if svcPort == svcPortOld { + Failf("TCP Spec.Ports[0].Port (%d) did not change", svcPort) + } + if tcpService.Spec.Ports[0].NodePort != tcpNodePort { + Failf("TCP Spec.Ports[0].NodePort (%d) changed", tcpService.Spec.Ports[0].NodePort) + } + if getIngressPoint(&tcpService.Status.LoadBalancer.Ingress[0]) != tcpIngressIP { + Failf("TCP Status.LoadBalancer.Ingress changed (%s -> %s) when not expected", tcpIngressIP, getIngressPoint(&tcpService.Status.LoadBalancer.Ingress[0])) + } + Logf("service port (TCP and UDP): %d", svcPort) + + By("changing the UDP service's port") + udpService = jig.UpdateServiceOrFail(ns2, udpService.Name, func(s *api.Service) { + s.Spec.Ports[0].Port++ + }) + jig.SanityCheckService(udpService, api.ServiceTypeLoadBalancer) + if udpService.Spec.Ports[0].Port != svcPort { + Failf("UDP Spec.Ports[0].Port (%d) did not change", udpService.Spec.Ports[0].Port) + } + if udpService.Spec.Ports[0].NodePort != udpNodePort { + Failf("UDP Spec.Ports[0].NodePort (%d) changed", udpService.Spec.Ports[0].NodePort) + } + if getIngressPoint(&udpService.Status.LoadBalancer.Ingress[0]) != udpIngressIP { + Failf("UDP Status.LoadBalancer.Ingress changed (%s -> %s) when not expected", udpIngressIP, getIngressPoint(&udpService.Status.LoadBalancer.Ingress[0])) + } + + By("hitting the TCP service's NodePort") + jig.TestReachableHTTP(nodeIP, tcpNodePort, kubeProxyLagTimeout) + + By("hitting the UDP service's NodePort") + jig.TestReachableUDP(nodeIP, udpNodePort, kubeProxyLagTimeout) + + By("hitting the TCP service's LoadBalancer") + jig.TestReachableHTTP(tcpIngressIP, svcPort, loadBalancerCreateTimeout) // this may actually recreate the LB + + By("hitting the UDP service's LoadBalancer") + jig.TestReachableUDP(udpIngressIP, svcPort, loadBalancerCreateTimeout) // this may actually recreate the LB) + + // Change the services back to ClusterIP. + + By("changing TCP service " + serviceName + " back to type=ClusterIP") + tcpService = jig.UpdateServiceOrFail(ns1, tcpService.Name, func(s *api.Service) { + s.Spec.Type = api.ServiceTypeClusterIP + s.Spec.Ports[0].NodePort = 0 + }) + // Wait for the load balancer to be destroyed asynchronously + tcpService = jig.WaitForLoadBalancerDestroyOrFail(ns1, tcpService.Name, tcpIngressIP, svcPort) + jig.SanityCheckService(tcpService, api.ServiceTypeClusterIP) + + By("changing UDP service " + serviceName + " back to type=ClusterIP") + udpService = jig.UpdateServiceOrFail(ns2, udpService.Name, func(s *api.Service) { + s.Spec.Type = api.ServiceTypeClusterIP + s.Spec.Ports[0].NodePort = 0 + }) + // Wait for the load balancer to be destroyed asynchronously + udpService = jig.WaitForLoadBalancerDestroyOrFail(ns2, udpService.Name, udpIngressIP, svcPort) + jig.SanityCheckService(udpService, api.ServiceTypeClusterIP) + + By("checking the TCP NodePort is closed") + jig.TestNotReachableHTTP(nodeIP, tcpNodePort, kubeProxyLagTimeout) + + By("checking the UDP NodePort is closed") + jig.TestNotReachableUDP(nodeIP, udpNodePort, kubeProxyLagTimeout) + + By("checking the TCP LoadBalancer is closed") + jig.TestNotReachableHTTP(tcpIngressIP, svcPort, loadBalancerLagTimeout) + + By("checking the UDP LoadBalancer is closed") + jig.TestNotReachableUDP(udpIngressIP, svcPort, loadBalancerLagTimeout) }) It("should prevent NodePort collisions", func() { + // TODO: use the ServiceTestJig here baseName := "nodeport-collision-" serviceName1 := baseName + "1" serviceName2 := baseName + "2" @@ -663,6 +720,7 @@ var _ = Describe("Services", func() { }) It("should check NodePort out-of-range", func() { + // TODO: use the ServiceTestJig here serviceName := "nodeport-range-test" ns := f.Namespace.Name @@ -729,6 +787,7 @@ var _ = Describe("Services", func() { }) It("should release NodePorts on delete", func() { + // TODO: use the ServiceTestJig here serviceName := "nodeport-reuse" ns := f.Namespace.Name @@ -781,118 +840,6 @@ var _ = Describe("Services", func() { service, err = t.CreateService(service) Expect(err).NotTo(HaveOccurred()) }) - - // This test hits several load-balancer cases because LB turnup is slow. - // Flaky issue #18952 - It("should serve identically named services in different namespaces on different load-balancers [Flaky]", func() { - // requires ExternalLoadBalancer - SkipUnlessProviderIs("gce", "gke", "aws") - - ns1 := f.Namespace.Name - - By("Building a second namespace api object") - namespacePtr, err := createTestingNS("services", c, nil) - Expect(err).NotTo(HaveOccurred()) - ns2 := namespacePtr.Name - extraNamespaces = append(extraNamespaces, ns2) - - serviceName := "test-svc" - servicePort := 9376 - - By("creating service " + serviceName + " with load balancer in namespace " + ns1) - t1 := NewServerTest(c, ns1, serviceName) - svc1 := t1.BuildServiceSpec() - svc1.Spec.Type = api.ServiceTypeLoadBalancer - svc1.Spec.Ports[0].Port = servicePort - svc1.Spec.Ports[0].TargetPort = intstr.FromInt(80) - _, err = t1.CreateService(svc1) - Expect(err).NotTo(HaveOccurred()) - - By("creating pod to be part of service " + serviceName + " in namespace " + ns1) - t1.CreateWebserverRC(1) - - loadBalancerIP := "" - if providerIs("gce", "gke") { - By("creating a static IP") - rand.Seed(time.Now().UTC().UnixNano()) - staticIPName := fmt.Sprintf("e2e-external-lb-test-%d", rand.Intn(65535)) - loadBalancerIP, err = createGCEStaticIP(staticIPName) - Expect(err).NotTo(HaveOccurred()) - defer func() { - // Release GCE static IP - this is not kube-managed and will not be automatically released. - deleteGCEStaticIP(staticIPName) - }() - } - - By("creating service " + serviceName + " with UDP load balancer in namespace " + ns2) - t2 := NewNetcatTest(c, ns2, serviceName) - svc2 := t2.BuildServiceSpec() - svc2.Spec.Type = api.ServiceTypeLoadBalancer - svc2.Spec.Ports[0].Port = servicePort - // UDP loadbalancing is tested via test NetcatTest - svc2.Spec.Ports[0].Protocol = api.ProtocolUDP - svc2.Spec.Ports[0].TargetPort = intstr.FromInt(80) - svc2.Spec.LoadBalancerIP = loadBalancerIP - _, err = t2.CreateService(svc2) - Expect(err).NotTo(HaveOccurred()) - - By("creating pod to be part of service " + serviceName + " in namespace " + ns2) - t2.CreateNetcatRC(2) - - ingressPoints := []string{} - svcs := []*api.Service{svc1, svc2} - for _, svc := range svcs { - namespace := svc.Namespace - lbip := svc.Spec.LoadBalancerIP - // This isn't actually part of the test, but it has the net effect of deleting the target pool and forwarding - // rule, so that we don't leak them - defer removeExternalLoadBalancer(f, svc) - - // Wait for the load balancer to be created asynchronously, which is - // currently indicated by ingress point(s) being added to the status. - result, err := waitForLoadBalancerIngress(c, serviceName, namespace) - Expect(err).NotTo(HaveOccurred()) - if len(result.Status.LoadBalancer.Ingress) != 1 { - Failf("got unexpected number (%v) of ingress points for externally load balanced service: %v", result.Status.LoadBalancer.Ingress, result) - } - ingress := result.Status.LoadBalancer.Ingress[0] - if len(result.Spec.Ports) != 1 { - Failf("got unexpected len(Spec.Ports) for LoadBalancer service: %v", result) - } - if lbip != "" { - Expect(ingress.IP).To(Equal(lbip)) - } - port := result.Spec.Ports[0] - if port.NodePort == 0 { - Failf("got unexpected Spec.Ports[0].nodePort for LoadBalancer service: %v", result) - } - if !ServiceNodePortRange.Contains(port.NodePort) { - Failf("got unexpected (out-of-range) port for LoadBalancer service: %v", result) - } - ing := result.Status.LoadBalancer.Ingress[0].IP - if ing == "" { - ing = result.Status.LoadBalancer.Ingress[0].Hostname - } - ingressPoints = append(ingressPoints, ing) // Save 'em to check uniqueness - - if svc1.Spec.Ports[0].Protocol == api.ProtocolTCP { - By("hitting the pod through the service's NodePort") - ip := pickNodeIP(c) - if err := wait.PollImmediate(poll, kubeProxyLagTimeout, func() (bool, error) { return testReachable(ip, port.NodePort) }); err != nil { - Failf("Could not reach nodePort service through node-ip %v:%v in %v", ip, port.NodePort, kubeProxyLagTimeout) - } - By("hitting the pod through the service's external load balancer") - testLoadBalancerReachable(ingress, servicePort) - } else { - By("hitting the pod through the service's NodePort") - testNetcatReachable(pickNodeIP(c), port.NodePort) - - By("hitting the pod through the service's external load balancer") - testNetcatLoadBalancerReachable(ingress, servicePort) - } - } - validateUniqueOrFail(ingressPoints) - }) }) // updateService fetches a service, calls the update function on it, @@ -918,66 +865,6 @@ func updateService(c *client.Client, namespace, serviceName string, update func( return service, err } -func waitForLoadBalancerIngress(c *client.Client, serviceName, namespace string) (*api.Service, error) { - // TODO: once support ticket 21807001 is resolved, reduce this timeout back to something reasonable - const timeout = 20 * time.Minute - var service *api.Service - By(fmt.Sprintf("waiting up to %v for service %s in namespace %s to have a LoadBalancer ingress point", timeout, serviceName, namespace)) - i := 1 - for start := time.Now(); time.Since(start) < timeout; time.Sleep(3 * time.Second) { - service, err := c.Services(namespace).Get(serviceName) - if err != nil { - Logf("Get service failed, ignoring for 5s: %v", err) - continue - } - if len(service.Status.LoadBalancer.Ingress) > 0 { - return service, nil - } - if i%5 == 0 { - Logf("Waiting for service %s in namespace %s to have a LoadBalancer ingress point (%v)", serviceName, namespace, time.Since(start)) - } - i++ - } - return service, fmt.Errorf("service %s in namespace %s doesn't have a LoadBalancer ingress point after %.2f seconds", serviceName, namespace, timeout.Seconds()) -} - -func waitForLoadBalancerDestroy(c *client.Client, serviceIP, servicePort, serviceName, namespace string) (*api.Service, error) { - // TODO: once support ticket 21807001 is resolved, reduce this timeout back to something reasonable - const timeout = 10 * time.Minute - var service *api.Service - defer func() { - if err := EnsureLoadBalancerResourcesDeleted(serviceIP, servicePort); err != nil { - Logf("Failed to delete cloud resources for service: %s %s (%v)", serviceIP, servicePort, err) - } - }() - By(fmt.Sprintf("waiting up to %v for service %s in namespace %s to have no LoadBalancer ingress points", timeout, serviceName, namespace)) - for start := time.Now(); time.Since(start) < timeout; time.Sleep(5 * time.Second) { - service, err := c.Services(namespace).Get(serviceName) - if err != nil { - Logf("Get service failed, ignoring for 5s: %v", err) - continue - } - if len(service.Status.LoadBalancer.Ingress) == 0 { - return service, nil - } - Logf("Waiting for service %s in namespace %s to have no LoadBalancer ingress points (%v)", serviceName, namespace, time.Since(start)) - } - - return service, fmt.Errorf("service %s in namespace %s still has LoadBalancer ingress points after %.2f seconds", serviceName, namespace, timeout.Seconds()) -} - -func validateUniqueOrFail(s []string) { - By(fmt.Sprintf("validating unique: %v", s)) - sort.Strings(s) - var prev string - for i, elem := range s { - if i > 0 && elem == prev { - Fail("duplicate found: " + elem) - } - prev = elem - } -} - func getContainerPortsByPodUID(endpoints *api.Endpoints) PortsByPodUID { m := PortsByPodUID{} for _, ss := range endpoints.Subsets { @@ -1181,50 +1068,8 @@ func pickNodeIP(c *client.Client) string { return ip } -func testLoadBalancerReachable(ingress api.LoadBalancerIngress, port int) bool { - return testLoadBalancerReachableInTime(ingress, port, podStartTimeout) -} - -func testNetcatLoadBalancerReachable(ingress api.LoadBalancerIngress, port int) bool { - return testNetcatLoadBalancerReachableInTime(ingress, port, podStartTimeout) -} - -func conditionFuncDecorator(ip string, port int, fn func(string, int) (bool, error)) wait.ConditionFunc { - return func() (bool, error) { - return fn(ip, port) - } -} - -func testLoadBalancerReachableInTime(ingress api.LoadBalancerIngress, port int, timeout time.Duration) bool { - ip := ingress.IP - if ip == "" { - ip = ingress.Hostname - } - - return testReachableInTime(conditionFuncDecorator(ip, port, testReachable), timeout) - -} - -func testNetcatLoadBalancerReachableInTime(ingress api.LoadBalancerIngress, port int, timeout time.Duration) bool { - ip := ingress.IP - if ip == "" { - ip = ingress.Hostname - } - - return testReachableInTime(conditionFuncDecorator(ip, port, testNetcatReachable), timeout) -} - -func testLoadBalancerNotReachable(ingress api.LoadBalancerIngress, port int) { - ip := ingress.IP - if ip == "" { - ip = ingress.Hostname - } - - testNotReachable(ip, port) -} - -func testReachable(ip string, port int) (bool, error) { - url := fmt.Sprintf("http://%s:%d", ip, port) +func testReachableHTTP(ip string, port int, request string, expect string) (bool, error) { + url := fmt.Sprintf("http://%s:%d%s", ip, port, request) if ip == "" { Failf("Got empty IP for reachability check (%s)", url) return false, nil @@ -1234,11 +1079,11 @@ func testReachable(ip string, port int) (bool, error) { return false, nil } - Logf("Testing reachability of %v", url) + Logf("Testing HTTP reachability of %v", url) resp, err := httpGetNoConnectionPool(url) if err != nil { - Logf("Got error waiting for reachability of %s: %v", url, err) + Logf("Got error testing for reachability of %s: %v", url, err) return false, nil } defer resp.Body.Close() @@ -1250,14 +1095,36 @@ func testReachable(ip string, port int) (bool, error) { if resp.StatusCode != 200 { return false, fmt.Errorf("received non-success return status %q trying to access %s; got body: %s", resp.Status, url, string(body)) } - if !strings.Contains(string(body), "test-webserver") { - return false, fmt.Errorf("received response body without expected substring 'test-webserver': %s", string(body)) + if !strings.Contains(string(body), expect) { + return false, fmt.Errorf("received response body without expected substring %q: %s", expect, string(body)) } Logf("Successfully reached %v", url) return true, nil } -func testNetcatReachable(ip string, port int) (bool, error) { +func testNotReachableHTTP(ip string, port int) (bool, error) { + url := fmt.Sprintf("http://%s:%d", ip, port) + if ip == "" { + Failf("Got empty IP for non-reachability check (%s)", url) + return false, nil + } + if port == 0 { + Failf("Got port==0 for non-reachability check (%s)", url) + return false, nil + } + + Logf("Testing HTTP non-reachability of %v", url) + + resp, err := httpGetNoConnectionPool(url) + if err != nil { + Logf("Confirmed that %s is not reachable", url) + return true, nil + } + resp.Body.Close() + return false, nil +} + +func testReachableUDP(ip string, port int, request string, expect string) (bool, error) { uri := fmt.Sprintf("udp://%s:%d", ip, port) if ip == "" { Failf("Got empty IP for reachability check (%s)", uri) @@ -1268,70 +1135,77 @@ func testNetcatReachable(ip string, port int) (bool, error) { return false, nil } - Logf("Testing reachability of %v", uri) + Logf("Testing UDP reachability of %v", uri) - con, err := net.Dial("udp", ip+":"+string(port)) + con, err := net.Dial("udp", ip+":"+strconv.Itoa(port)) if err != nil { - return false, fmt.Errorf("Failed to connect to: %s:%d (%s)", ip, port, err.Error()) + return false, fmt.Errorf("Failed to dial %s:%d: %v", ip, port, err) } - _, err = con.Write([]byte("\n")) + _, err = con.Write([]byte(fmt.Sprintf("%s\n", request))) if err != nil { - return false, fmt.Errorf("Failed to send newline: %s", err.Error()) + return false, fmt.Errorf("Failed to send request: %v", err) } - var buf []byte = make([]byte, len("SUCCESS")+1) + var buf []byte = make([]byte, len(expect)+1) + + err = con.SetDeadline(time.Now().Add(3 * time.Second)) + if err != nil { + return false, fmt.Errorf("Failed to set deadline: %v", err) + } _, err = con.Read(buf) if err != nil { - return false, fmt.Errorf("Failed to read result: %s", err.Error()) + return false, nil } - if !strings.HasPrefix(string(buf), "SUCCESS") { - return false, fmt.Errorf("Failed to retrieve: \"SUCCESS\"") + if !strings.Contains(string(buf), expect) { + return false, fmt.Errorf("Failed to retrieve %q, got %q", expect, string(buf)) } - Logf("Successfully retrieved \"SUCCESS\"") + Logf("Successfully reached %v", uri) return true, nil } -func testReachableInTime(testFunc wait.ConditionFunc, timeout time.Duration) bool { - By(fmt.Sprintf("Waiting up to %v", timeout)) - err := wait.PollImmediate(poll, timeout, testFunc) - if err != nil { - Expect(err).NotTo(HaveOccurred(), "Error waiting") - return false - } - return true -} - -func testNotReachable(ip string, port int) { - url := fmt.Sprintf("http://%s:%d", ip, port) +func testNotReachableUDP(ip string, port int, request string) (bool, error) { + uri := fmt.Sprintf("udp://%s:%d", ip, port) if ip == "" { - Failf("Got empty IP for non-reachability check (%s)", url) + Failf("Got empty IP for reachability check (%s)", uri) + return false, nil } if port == 0 { - Failf("Got port==0 for non-reachability check (%s)", url) + Failf("Got port==0 for reachability check (%s)", uri) + return false, nil } - desc := fmt.Sprintf("the url %s to be *not* reachable", url) - By(fmt.Sprintf("Waiting up to %v for %s", podStartTimeout, desc)) - err := wait.PollImmediate(poll, podStartTimeout, func() (bool, error) { - resp, err := httpGetNoConnectionPool(url) - if err != nil { - Logf("Successfully waited for %s", desc) - return true, nil - } - defer resp.Body.Close() - body, err := ioutil.ReadAll(resp.Body) - if err != nil { - Logf("Expecting %s to be unreachable but was reachable and got an error reading response: %v", url, err) - return false, nil - } - Logf("Able to reach service %s when should no longer have been reachable, status: %q and body: %s", url, resp.Status, string(body)) - return false, nil - }) - Expect(err).NotTo(HaveOccurred(), "Error waiting for %s", desc) + Logf("Testing UDP non-reachability of %v", uri) + + con, err := net.Dial("udp", ip+":"+strconv.Itoa(port)) + if err != nil { + Logf("Confirmed that %s is not reachable", uri) + return true, nil + } + + _, err = con.Write([]byte(fmt.Sprintf("%s\n", request))) + if err != nil { + Logf("Confirmed that %s is not reachable", uri) + return true, nil + } + + var buf []byte = make([]byte, 1) + + err = con.SetDeadline(time.Now().Add(3 * time.Second)) + if err != nil { + return false, fmt.Errorf("Failed to set deadline: %v", err) + } + + _, err = con.Read(buf) + if err != nil { + Logf("Confirmed that %s is not reachable", uri) + return true, nil + } + + return false, nil } // Creates a replication controller that serves its hostname and a service on top of it. @@ -1502,6 +1376,349 @@ func httpGetNoConnectionPool(url string) (*http.Response, error) { return client.Get(url) } +// A test jig to help testing. +type ServiceTestJig struct { + ID string + Name string + Client *client.Client + Labels map[string]string +} + +// NewServiceTestJig allocates and inits a new ServiceTestJig. +func NewServiceTestJig(client *client.Client, name string) *ServiceTestJig { + j := &ServiceTestJig{} + j.Client = client + j.Name = name + j.ID = j.Name + "-" + string(util.NewUUID()) + j.Labels = map[string]string{"testid": j.ID} + + return j +} + +// newServiceTemplate returns the default api.Service template for this jig, but +// does not actually create the Service. The default Service has the same name +// as the jig and exposes port 80. +func (j *ServiceTestJig) newServiceTemplate(namespace string, proto api.Protocol) *api.Service { + service := &api.Service{ + ObjectMeta: api.ObjectMeta{ + Namespace: namespace, + Name: j.Name, + Labels: j.Labels, + }, + Spec: api.ServiceSpec{ + Selector: j.Labels, + Ports: []api.ServicePort{ + { + Protocol: proto, + Port: 80, + }, + }, + }, + } + return service +} + +// CreateTCPServiceOrFail creates a new TCP Service based on the jig's +// defaults. Callers can provide a function to tweak the Service object before +// it is created. +func (j *ServiceTestJig) CreateTCPServiceOrFail(namespace string, tweak func(svc *api.Service)) *api.Service { + svc := j.newServiceTemplate(namespace, api.ProtocolTCP) + if tweak != nil { + tweak(svc) + } + result, err := j.Client.Services(namespace).Create(svc) + if err != nil { + Failf("Failed to create TCP Service %q: %v", svc.Name, err) + } + return result +} + +// CreateUDPServiceOrFail creates a new UDP Service based on the jig's +// defaults. Callers can provide a function to tweak the Service object before +// it is created. +func (j *ServiceTestJig) CreateUDPServiceOrFail(namespace string, tweak func(svc *api.Service)) *api.Service { + svc := j.newServiceTemplate(namespace, api.ProtocolUDP) + if tweak != nil { + tweak(svc) + } + result, err := j.Client.Services(namespace).Create(svc) + if err != nil { + Failf("Failed to create UDP Service %q: %v", svc.Name, err) + } + return result +} + +func (j *ServiceTestJig) SanityCheckService(svc *api.Service, svcType api.ServiceType) { + if svc.Spec.Type != svcType { + Failf("unexpected Spec.Type (%s) for service, expected %s", svc.Spec.Type, svcType) + } + expectNodePorts := false + if svcType != api.ServiceTypeClusterIP { + expectNodePorts = true + } + for i, port := range svc.Spec.Ports { + hasNodePort := (port.NodePort != 0) + if hasNodePort != expectNodePorts { + Failf("unexpected Spec.Ports[%d].NodePort (%d) for service", i, port.NodePort) + } + if hasNodePort { + if !ServiceNodePortRange.Contains(port.NodePort) { + Failf("out-of-range nodePort (%d) for service", port.NodePort) + } + } + } + expectIngress := false + if svcType == api.ServiceTypeLoadBalancer { + expectIngress = true + } + hasIngress := len(svc.Status.LoadBalancer.Ingress) != 0 + if hasIngress != expectIngress { + Failf("unexpected number of Status.LoadBalancer.Ingress (%d) for service", len(svc.Status.LoadBalancer.Ingress)) + } + if hasIngress { + for i, ing := range svc.Status.LoadBalancer.Ingress { + if ing.IP == "" && ing.Hostname == "" { + Failf("unexpected Status.LoadBalancer.Ingress[%d] for service: %#v", i, ing) + } + } + } +} + +// UpdateService fetches a service, calls the update function on it, and +// then attempts to send the updated service. It tries up to 3 times in the +// face of timeouts and conflicts. +func (j *ServiceTestJig) UpdateService(namespace, name string, update func(*api.Service)) (*api.Service, error) { + for i := 0; i < 3; i++ { + service, err := j.Client.Services(namespace).Get(name) + if err != nil { + return nil, fmt.Errorf("Failed to get Service %q: %v", name, err) + } + + update(service) + service, err = j.Client.Services(namespace).Update(service) + if err == nil { + return service, nil + } + if !errors.IsConflict(err) && !errors.IsServerTimeout(err) { + return nil, fmt.Errorf("Failed to update Service %q: %v", name, err) + } + } + return nil, fmt.Errorf("Too many retries updating Service %q", name) +} + +// UpdateServiceOrFail fetches a service, calls the update function on it, and +// then attempts to send the updated service. It tries up to 3 times in the +// face of timeouts and conflicts. +func (j *ServiceTestJig) UpdateServiceOrFail(namespace, name string, update func(*api.Service)) *api.Service { + svc, err := j.UpdateService(namespace, name, update) + if err != nil { + Failf(err.Error()) + } + return svc +} + +func (j *ServiceTestJig) ChangeServiceNodePortOrFail(namespace, name string, initial int) *api.Service { + var err error + var service *api.Service + for i := 1; i < ServiceNodePortRange.Size; i++ { + offs1 := initial - ServiceNodePortRange.Base + offs2 := (offs1 + i) % ServiceNodePortRange.Size + newPort := ServiceNodePortRange.Base + offs2 + service, err = j.UpdateService(namespace, name, func(s *api.Service) { + s.Spec.Ports[0].NodePort = newPort + }) + if err != nil && strings.Contains(err.Error(), "provided port is already allocated") { + Logf("tried nodePort %d, but it is in use, will try another", newPort) + continue + } + // Otherwise err was nil or err was a real error + break + } + if err != nil { + Failf("Could not change the nodePort: %v", err) + } + return service +} + +func (j *ServiceTestJig) WaitForLoadBalancerOrFail(namespace, name string) *api.Service { + var service *api.Service + Logf("Waiting up to %v for service %q to have a LoadBalancer", loadBalancerCreateTimeout, name) + pollFunc := func() (bool, error) { + svc, err := j.Client.Services(namespace).Get(name) + if err != nil { + return false, err + } + if len(svc.Status.LoadBalancer.Ingress) > 0 { + service = svc + return true, nil + } + return false, nil + } + if err := wait.PollImmediate(poll, loadBalancerCreateTimeout, pollFunc); err != nil { + Failf("Timeout waiting for service %q to have a load balancer", name) + } + return service +} + +func (j *ServiceTestJig) WaitForLoadBalancerDestroyOrFail(namespace, name string, ip string, port int) *api.Service { + // TODO: once support ticket 21807001 is resolved, reduce this timeout back to something reasonable + defer func() { + if err := EnsureLoadBalancerResourcesDeleted(ip, strconv.Itoa(port)); err != nil { + Logf("Failed to delete cloud resources for service: %s %d (%v)", ip, port, err) + } + }() + + var service *api.Service + Logf("Waiting up to %v for service %q to have no LoadBalancer", loadBalancerCreateTimeout, name) + pollFunc := func() (bool, error) { + svc, err := j.Client.Services(namespace).Get(name) + if err != nil { + return false, err + } + if len(svc.Status.LoadBalancer.Ingress) == 0 { + service = svc + return true, nil + } + return false, nil + } + if err := wait.PollImmediate(poll, loadBalancerCreateTimeout, pollFunc); err != nil { + Failf("Timeout waiting for service %q to have no load balancer", name) + } + return service +} + +func (j *ServiceTestJig) TestReachableHTTP(host string, port int, timeout time.Duration) { + if err := wait.PollImmediate(poll, timeout, func() (bool, error) { return testReachableHTTP(host, port, "/echo?msg=hello", "hello") }); err != nil { + Failf("Could not reach HTTP service through %v:%v after %v: %v", host, port, timeout, err) + } +} + +func (j *ServiceTestJig) TestNotReachableHTTP(host string, port int, timeout time.Duration) { + if err := wait.PollImmediate(poll, timeout, func() (bool, error) { return testNotReachableHTTP(host, port) }); err != nil { + Failf("Could still reach HTTP service through %v:%v after %v: %v", host, port, timeout, err) + } +} + +func (j *ServiceTestJig) TestReachableUDP(host string, port int, timeout time.Duration) { + if err := wait.PollImmediate(poll, timeout, func() (bool, error) { return testReachableUDP(host, port, "echo hello", "hello") }); err != nil { + Failf("Could not reach UDP service through %v:%v after %v: %v", host, port, timeout, err) + } +} + +func (j *ServiceTestJig) TestNotReachableUDP(host string, port int, timeout time.Duration) { + if err := wait.PollImmediate(poll, timeout, func() (bool, error) { return testNotReachableUDP(host, port, "echo hello") }); err != nil { + Failf("Could still reach UDP service through %v:%v after %v: %v", host, port, timeout, err) + } +} + +func getIngressPoint(ing *api.LoadBalancerIngress) string { + host := ing.IP + if host == "" { + host = ing.Hostname + } + return host +} + +// newRCTemplate returns the default api.ReplicationController object for +// this jig, but does not actually create the RC. The default RC has the same +// name as the jig and runs the "netexec" container. +func (j *ServiceTestJig) newRCTemplate(namespace string) *api.ReplicationController { + rc := &api.ReplicationController{ + ObjectMeta: api.ObjectMeta{ + Namespace: namespace, + Name: j.Name, + Labels: j.Labels, + }, + Spec: api.ReplicationControllerSpec{ + Replicas: 1, + Selector: j.Labels, + Template: &api.PodTemplateSpec{ + ObjectMeta: api.ObjectMeta{ + Labels: j.Labels, + }, + Spec: api.PodSpec{ + Containers: []api.Container{ + { + Name: "netexec", + Image: "gcr.io/google_containers/netexec:1.4", + Args: []string{"--http-port=80", "--udp-port=80"}, + ReadinessProbe: &api.Probe{ + PeriodSeconds: 3, + Handler: api.Handler{ + HTTPGet: &api.HTTPGetAction{ + Port: intstr.FromInt(80), + Path: "/hostName", + }, + }, + }, + }, + }, + TerminationGracePeriodSeconds: new(int64), + }, + }, + }, + } + return rc +} + +// RunOrFail creates a ReplicationController and Pod(s) and waits for the +// Pod(s) to be running. Callers can provide a function to tweak the RC object +// before it is created. +func (j *ServiceTestJig) RunOrFail(namespace string, tweak func(rc *api.ReplicationController)) *api.ReplicationController { + rc := j.newRCTemplate(namespace) + if tweak != nil { + tweak(rc) + } + result, err := j.Client.ReplicationControllers(namespace).Create(rc) + if err != nil { + Failf("Failed to created RC %q: %v", rc.Name, err) + } + pods, err := j.waitForPodsCreated(namespace, rc.Spec.Replicas) + if err != nil { + Failf("Failed to create pods: %v", err) + } + if err := j.waitForPodsReady(namespace, pods); err != nil { + Failf("Failed waiting for pods to be running: %v", err) + } + return result +} + +func (j *ServiceTestJig) waitForPodsCreated(namespace string, replicas int) ([]string, error) { + timeout := 2 * time.Minute + // List the pods, making sure we observe all the replicas. + label := labels.SelectorFromSet(labels.Set(j.Labels)) + Logf("Waiting up to %v for %d pods to be created", timeout, replicas) + for start := time.Now(); time.Since(start) < timeout; time.Sleep(2 * time.Second) { + options := api.ListOptions{LabelSelector: label} + pods, err := j.Client.Pods(namespace).List(options) + if err != nil { + return nil, err + } + + found := []string{} + for _, pod := range pods.Items { + if pod.DeletionTimestamp != nil { + continue + } + found = append(found, pod.Name) + } + if len(found) == replicas { + Logf("Found all %d pods", replicas) + return found, nil + } + Logf("Found %d/%d pods - will retry", len(found), replicas) + } + return nil, fmt.Errorf("Timeout waiting for %d pods to be created", replicas) +} + +func (j *ServiceTestJig) waitForPodsReady(namespace string, pods []string) error { + timeout := 2 * time.Minute + if !checkPodsRunningReady(j.Client, namespace, pods, timeout) { + return fmt.Errorf("Timeout waiting for %d pods to be ready") + } + return nil +} + // Simple helper class to avoid too much boilerplate in tests type ServiceTestFixture struct { ServiceName string @@ -1536,25 +1753,6 @@ func NewServerTest(client *client.Client, namespace string, serviceName string) return t } -func NewNetcatTest(client *client.Client, namespace string, serviceName string) *ServiceTestFixture { - t := &ServiceTestFixture{} - t.Client = client - t.Namespace = namespace - t.ServiceName = serviceName - t.TestId = t.ServiceName + "-" + string(util.NewUUID()) - t.Labels = map[string]string{ - "testid": t.TestId, - } - - t.rcs = make(map[string]bool) - t.services = make(map[string]bool) - - t.name = "netcat" - t.image = "ubuntu" - - return t -} - // Build default config for a service (which can then be changed) func (t *ServiceTestFixture) BuildServiceSpec() *api.Service { service := &api.Service{ @@ -1587,22 +1785,6 @@ func (t *ServiceTestFixture) CreateWebserverRC(replicas int) *api.ReplicationCon return rcAct } -// CreateNetcatRC creates rc-backed pods with a netcat listener -// configuration and records it for cleanup. -func (t *ServiceTestFixture) CreateNetcatRC(replicas int) *api.ReplicationController { - rcSpec := rcByNamePort(t.name, replicas, t.image, 80, api.ProtocolUDP, t.Labels) - rcSpec.Spec.Template.Spec.Containers[0].Command = []string{"/bin/bash"} - rcSpec.Spec.Template.Spec.Containers[0].Args = []string{"-c", "echo SUCCESS | nc -q 0 -u -l 0.0.0.0 80"} - rcAct, err := t.createRC(rcSpec) - if err != nil { - Failf("Failed to create rc %s: %v", rcSpec.Name, err) - } - if err := verifyPods(t.Client, t.Namespace, t.name, false, replicas); err != nil { - Failf("Failed to create %d pods with name %s: %v", replicas, t.name, err) - } - return rcAct -} - // createRC creates a replication controller and records it for cleanup. func (t *ServiceTestFixture) createRC(rc *api.ReplicationController) (*api.ReplicationController, error) { rc, err := t.Client.ReplicationControllers(t.Namespace).Create(rc) @@ -1661,35 +1843,3 @@ func (t *ServiceTestFixture) Cleanup() []error { return errs } - -func removeExternalLoadBalancer(f *Framework, svc *api.Service) { - By("changing service " + svc.Name + " back to type=ClusterIP") - service, err := updateService(f.Client, f.Namespace.Name, svc.Name, func(s *api.Service) { - s.Spec.Type = api.ServiceTypeClusterIP - s.Spec.Ports[0].NodePort = 0 - }) - Expect(err).NotTo(HaveOccurred()) - - // Updating the service type shouldn't change the Status immediately. The status should be - // updated after waitForLoadBalancerDestroy. - if len(service.Status.LoadBalancer.Ingress) == 0 { - Failf("got unexpected len(Status.LoadBalancer.Ingress) for NodePort service: %v", service) - } - if service.Spec.Type != api.ServiceTypeClusterIP { - Failf("got unexpected Spec.Type for back-to-ClusterIP service: %v", service) - } - if len(service.Spec.Ports) != 1 { - Failf("got unexpected len(Spec.Ports) for back-to-ClusterIP service: %v", service) - } - if service.Spec.Ports[0].NodePort != 0 { - Failf("got unexpected Spec.Ports[0].nodePort for back-to-ClusterIP service: %v", service) - } - - // Wait for the load balancer to be destroyed asynchronously - service, err = waitForLoadBalancerDestroy(f.Client, svc.Status.LoadBalancer.Ingress[0].IP, strconv.Itoa(svc.Spec.Ports[0].Port), svc.Name, f.Namespace.Name) - Expect(err).NotTo(HaveOccurred()) - - if len(service.Status.LoadBalancer.Ingress) != 0 { - Failf("got unexpected len(Status.LoadBalancer.Ingress) for back-to-ClusterIP service: %v", service) - } -} diff --git a/test/e2e/util.go b/test/e2e/util.go index ee5a4852495..810153fc0ec 100644 --- a/test/e2e/util.go +++ b/test/e2e/util.go @@ -744,13 +744,13 @@ func deleteNS(c *client.Client, namespace string, timeout time.Duration) error { return nil } -// Waits default ammount of time (podStartTimeout) for the specified pod to become running. +// Waits default amount of time (podStartTimeout) for the specified pod to become running. // Returns an error if timeout occurs first, or pod goes in to failed state. func waitForPodRunningInNamespace(c *client.Client, podName string, namespace string) error { return waitTimeoutForPodRunningInNamespace(c, podName, namespace, podStartTimeout) } -// Waits an extended ammount of time (slowPodStartTimeout) for the specified pod to become running. +// Waits an extended amount of time (slowPodStartTimeout) for the specified pod to become running. // Returns an error if timeout occurs first, or pod goes in to failed state. func waitForPodRunningInNamespaceSlow(c *client.Client, podName string, namespace string) error { return waitTimeoutForPodRunningInNamespace(c, podName, namespace, slowPodStartTimeout) @@ -2340,7 +2340,7 @@ func getSigner(provider string) (ssh.Signer, error) { // in namespace ns are running and ready, using c and waiting at most timeout. func checkPodsRunningReady(c *client.Client, ns string, podNames []string, timeout time.Duration) bool { np, desc := len(podNames), "running and ready" - Logf("Waiting up to %v for the following %d pods to be %s: %s", timeout, np, desc, podNames) + Logf("Waiting up to %v for %d pods to be %s: %s", timeout, np, desc, podNames) result := make(chan bool, len(podNames)) for ix := range podNames { // Launch off pod readiness checkers.