From fdd7f1e4b254e92dccfe4808f2057fc71e841c6e Mon Sep 17 00:00:00 2001 From: Wojciech Tyczynski Date: Thu, 16 Jul 2015 14:38:47 +0200 Subject: [PATCH] Rewrite services shell test in Go. --- hack/e2e-suite/services.sh | 5 + hack/jenkins/e2e.sh | 1 + test/e2e/service.go | 250 +++++++++++++++++++++++++++++++++++++ test/e2e/util.go | 40 ++++++ 4 files changed, 296 insertions(+) diff --git a/hack/e2e-suite/services.sh b/hack/e2e-suite/services.sh index 3e3c59c3e66..35d1cf01f95 100755 --- a/hack/e2e-suite/services.sh +++ b/hack/e2e-suite/services.sh @@ -16,6 +16,11 @@ # Verifies that services and virtual IPs work. + +# TODO(wojtek-t): Remove this test once the following go tests are stable: +# - "should work after restarting kube-proxy" +# - "should work after restarting apiserver" + set -o errexit set -o nounset set -o pipefail diff --git a/hack/jenkins/e2e.sh b/hack/jenkins/e2e.sh index f98ce20c5de..3a0ce9c376c 100755 --- a/hack/jenkins/e2e.sh +++ b/hack/jenkins/e2e.sh @@ -94,6 +94,7 @@ GCE_PARALLEL_SKIP_TESTS=( "Nodes\sNetwork" "Nodes\sResize" "MaxPods" + "Services.*restarting" "Shell.*services" ) diff --git a/test/e2e/service.go b/test/e2e/service.go index ccd97b28f72..45658356223 100644 --- a/test/e2e/service.go +++ b/test/e2e/service.go @@ -230,6 +230,131 @@ var _ = Describe("Services", func() { validateEndpointsOrFail(c, ns, serviceName, map[string][]int{}) }) + It("should be able to up and down services", func() { + ns := namespaces[0] + numPods, servicePort := 3, 80 + + podNames1, svc1IP, err := startServeHostnameService(c, ns, "service1", servicePort, numPods) + Expect(err).NotTo(HaveOccurred()) + podNames2, svc2IP, err := startServeHostnameService(c, ns, "service2", servicePort, numPods) + Expect(err).NotTo(HaveOccurred()) + + hosts, err := NodeSSHHosts(c) + Expect(err).NotTo(HaveOccurred()) + if len(hosts) == 0 { + Failf("No ssh-able nodes") + } + host := hosts[0] + + expectNoError(verifyServeHostnameServiceUp(c, host, podNames1, svc1IP, servicePort)) + expectNoError(verifyServeHostnameServiceUp(c, host, podNames2, svc2IP, servicePort)) + + // Stop service 1 and make sure it is gone. + expectNoError(stopServeHostnameService(c, ns, "service1")) + + expectNoError(verifyServeHostnameServiceDown(c, host, svc1IP, servicePort)) + expectNoError(verifyServeHostnameServiceUp(c, host, podNames2, svc2IP, servicePort)) + + // Start another service and verify both are up. + podNames3, svc3IP, err := startServeHostnameService(c, ns, "service3", servicePort, numPods) + Expect(err).NotTo(HaveOccurred()) + + if svc2IP == svc3IP { + Failf("VIPs conflict: %v", svc2IP) + } + + expectNoError(verifyServeHostnameServiceUp(c, host, podNames2, svc2IP, servicePort)) + expectNoError(verifyServeHostnameServiceUp(c, host, podNames3, svc3IP, servicePort)) + + expectNoError(stopServeHostnameService(c, ns, "service2")) + expectNoError(stopServeHostnameService(c, ns, "service3")) + }) + + It("should work after restarting kube-proxy", func() { + SkipUnlessProviderIs("gce", "gke") + + ns := namespaces[0] + numPods, servicePort := 3, 80 + + defer func() { expectNoError(stopServeHostnameService(c, ns, "service1")) }() + podNames1, svc1IP, err := startServeHostnameService(c, ns, "service1", servicePort, numPods) + Expect(err).NotTo(HaveOccurred()) + defer func() { expectNoError(stopServeHostnameService(c, ns, "service2")) }() + podNames2, svc2IP, err := startServeHostnameService(c, ns, "service2", servicePort, numPods) + Expect(err).NotTo(HaveOccurred()) + + if svc1IP == svc2IP { + Failf("VIPs conflict: %v", svc1IP) + } + + hosts, err := NodeSSHHosts(c) + Expect(err).NotTo(HaveOccurred()) + if len(hosts) == 0 { + Failf("No ssh-able nodes") + } + host := hosts[0] + + expectNoError(verifyServeHostnameServiceUp(c, host, podNames1, svc1IP, servicePort)) + expectNoError(verifyServeHostnameServiceUp(c, host, podNames2, svc2IP, servicePort)) + + // Restart kube-proxy and verify that services are still reachable (after some time). + if err := restartKubeProxy(host); err != nil { + Failf("error restarting kube-proxy: %v", err) + } + expectNoError(verifyServeHostnameServiceUp(c, host, podNames1, svc1IP, servicePort)) + expectNoError(verifyServeHostnameServiceUp(c, host, podNames2, svc2IP, servicePort)) + + // Remove iptable rules and make sure they come back. + By("Remove iptable rules and make sure they come back") + _, _, code, err := SSH(` + sudo iptables -t nat -F KUBE-PORTALS-HOST || true; + sudo iptables -t nat -F KUBE-PORTALS-CONTAINER || true`, host, testContext.Provider) + if err != nil || code != 0 { + Failf("couldn't remove iptable rules: %v (code %v)", err, code) + } + expectNoError(verifyServeHostnameServiceUp(c, host, podNames1, svc1IP, servicePort)) + expectNoError(verifyServeHostnameServiceUp(c, host, podNames2, svc2IP, servicePort)) + }) + + It("should work after restarting apiserver", func() { + // TODO: restartApiserver doesn't work in GKE - fix it and reenable this test. + SkipUnlessProviderIs("gce") + + ns := namespaces[0] + numPods, servicePort := 3, 80 + + defer func() { expectNoError(stopServeHostnameService(c, ns, "service1")) }() + podNames1, svc1IP, err := startServeHostnameService(c, ns, "service1", servicePort, numPods) + + hosts, err := NodeSSHHosts(c) + Expect(err).NotTo(HaveOccurred()) + if len(hosts) == 0 { + Failf("No ssh-able nodes") + } + host := hosts[0] + + expectNoError(verifyServeHostnameServiceUp(c, host, podNames1, svc1IP, servicePort)) + + // Restart apiserver + if err := restartApiserver(); err != nil { + Failf("error restarting apiserver: %v", err) + } + if err := waitForApiserverUp(c); err != nil { + Failf("error while waiting for apiserver up: %v", err) + } + expectNoError(verifyServeHostnameServiceUp(c, host, podNames1, svc1IP, servicePort)) + + // Create a new service and check if it's not reusing IP. + defer func() { expectNoError(stopServeHostnameService(c, ns, "service2")) }() + podNames2, svc2IP, err := startServeHostnameService(c, ns, "service2", servicePort, numPods) + + if svc1IP == svc2IP { + Failf("VIPs conflict: %v", svc1IP) + } + expectNoError(verifyServeHostnameServiceUp(c, host, podNames1, svc1IP, servicePort)) + expectNoError(verifyServeHostnameServiceUp(c, host, podNames2, svc2IP, servicePort)) + }) + It("should be able to create a functioning external load balancer", func() { // requires ExternalLoadBalancer SkipUnlessProviderIs("gce", "gke", "aws") @@ -1112,6 +1237,131 @@ func testNotReachable(ip string, port int) { Expect(err).NotTo(HaveOccurred(), "Error waiting for %s", desc) } +// Creates a replication controller that serves its hostname and a service on top of it. +func startServeHostnameService(c *client.Client, ns, name string, port, replicas int) ([]string, string, error) { + podNames := make([]string, replicas) + + _, err := c.Services(ns).Create(&api.Service{ + ObjectMeta: api.ObjectMeta{ + Name: name, + }, + Spec: api.ServiceSpec{ + Ports: []api.ServicePort{{ + Port: port, + TargetPort: util.NewIntOrStringFromInt(9376), + Protocol: "TCP", + }}, + Selector: map[string]string{ + "name": name, + }, + }, + }) + if err != nil { + return podNames, "", err + } + + var createdPods []*api.Pod + maxContainerFailures := 0 + config := RCConfig{ + Client: c, + Image: "gcr.io/google_containers/serve_hostname:1.1", + Name: name, + Namespace: ns, + PollInterval: 3 * time.Second, + Timeout: 30 * time.Second, + Replicas: replicas, + CreatedPods: &createdPods, + MaxContainerFailures: &maxContainerFailures, + } + err = RunRC(config) + if err != nil { + return podNames, "", err + } + + if len(createdPods) != replicas { + return podNames, "", fmt.Errorf("Incorrect number of running pods: %v", len(createdPods)) + } + + for i := range createdPods { + podNames[i] = createdPods[i].ObjectMeta.Name + } + sort.StringSlice(podNames).Sort() + + service, err := c.Services(ns).Get(name) + if err != nil { + return podNames, "", err + } + if service.Spec.ClusterIP == "" { + return podNames, "", fmt.Errorf("Service IP is blank for %v", name) + } + serviceIP := service.Spec.ClusterIP + return podNames, serviceIP, nil +} + +func stopServeHostnameService(c *client.Client, ns, name string) error { + if err := DeleteRC(c, ns, name); err != nil { + return err + } + if err := c.Services(ns).Delete(name); err != nil { + return err + } + return nil +} + +func verifyServeHostnameServiceUp(c *client.Client, host string, expectedPods []string, serviceIP string, servicePort int) error { + command := fmt.Sprintf( + "for i in $(seq 1 %d); do wget -q -T 1 -O - http://%s:%d || true; echo; done", + 3*len(expectedPods), serviceIP, servicePort) + + commands := []string{ + // verify service from node + fmt.Sprintf(`set -e; %s | sort -n | uniq`, command), + // verify service from container + fmt.Sprintf(`set -e; + sudo docker pull gcr.io/google_containers/busybox > /dev/null; + sudo docker run gcr.io/google_containers/busybox sh -c '%v' | sort -n | uniq`, + command), + } + + for _, cmd := range commands { + passed := false + for start := time.Now(); time.Since(start) < time.Minute; time.Sleep(5) { + stdout, _, code, err := SSH(cmd, host, testContext.Provider) + if err != nil || code != 0 { + Logf("error while SSH-ing to node: %v (code %v)", err, code) + } + pods := strings.Split(strings.TrimSpace(stdout), "\n") + sort.StringSlice(pods).Sort() + if api.Semantic.DeepEqual(pods, expectedPods) { + passed = true + break + } + Logf("Expected pods: %v, got: %v", expectedPods, pods) + } + if !passed { + return fmt.Errorf("service verification failed for:\n %s", cmd) + } + } + return nil +} + +func verifyServeHostnameServiceDown(c *client.Client, host string, serviceIP string, servicePort int) error { + command := fmt.Sprintf( + "curl -s --connect-timeout 2 http://%s:%d && exit 99", serviceIP, servicePort) + + for start := time.Now(); time.Since(start) < time.Minute; time.Sleep(5 * time.Second) { + _, _, code, err := SSH(command, host, testContext.Provider) + if err != nil { + Logf("error while SSH-ing to node: %v", err) + } + if code != 99 { + return nil + } + Logf("service still alive - still waiting") + } + return fmt.Errorf("waiting for service to be down timed out") +} + // Does an HTTP GET, but does not reuse TCP connections // This masks problems where the iptables rule has changed, but we don't see it // This is intended for relatively quick requests (status checks), so we set a short (5 seconds) timeout diff --git a/test/e2e/util.go b/test/e2e/util.go index 39737aa0cf2..4eb388ff5af 100644 --- a/test/e2e/util.go +++ b/test/e2e/util.go @@ -1740,3 +1740,43 @@ func parseKVLines(output, key string) string { } return "" } + +func restartKubeProxy(host string) error { + // TODO: Make it work for all providers. + if !providerIs("gce", "gke", "aws") { + return fmt.Errorf("unsupported provider: %s", testContext.Provider) + } + _, _, code, err := SSH("sudo /etc/init.d/kube-proxy restart", host, testContext.Provider) + if err != nil || code != 0 { + return fmt.Errorf("couldn't restart kube-proxy: %v (code %v)", err, code) + } + return nil +} + +func restartApiserver() error { + // TODO: Make it work for all providers. + if !providerIs("gce", "gke", "aws") { + return fmt.Errorf("unsupported provider: %s", testContext.Provider) + } + var command string + if providerIs("gce", "gke") { + command = "sudo docker ps | grep /kube-apiserver | cut -d ' ' -f 1 | xargs sudo docker kill" + } else { + command = "sudo /etc/init.d/kube-apiserver restart" + } + _, _, code, err := SSH(command, getMasterHost()+":22", testContext.Provider) + if err != nil || code != 0 { + return fmt.Errorf("couldn't restart apiserver: %v (code %v)", err, code) + } + return nil +} + +func waitForApiserverUp(c *client.Client) error { + for start := time.Now(); time.Since(start) < time.Minute; time.Sleep(5 * time.Second) { + body, err := c.Get().AbsPath("/healthz").Do().Raw() + if err == nil && string(body) == "ok" { + return nil + } + } + return fmt.Errorf("waiting for apiserver timed out") +}