Rewrite services shell test in Go.

This commit is contained in:
Wojciech Tyczynski 2015-07-16 14:38:47 +02:00
parent 1a49ba1bdb
commit fdd7f1e4b2
4 changed files with 296 additions and 0 deletions

View File

@ -16,6 +16,11 @@
# Verifies that services and virtual IPs work.
# TODO(wojtek-t): Remove this test once the following go tests are stable:
# - "should work after restarting kube-proxy"
# - "should work after restarting apiserver"
set -o errexit
set -o nounset
set -o pipefail

View File

@ -94,6 +94,7 @@ GCE_PARALLEL_SKIP_TESTS=(
"Nodes\sNetwork"
"Nodes\sResize"
"MaxPods"
"Services.*restarting"
"Shell.*services"
)

View File

@ -230,6 +230,131 @@ var _ = Describe("Services", func() {
validateEndpointsOrFail(c, ns, serviceName, map[string][]int{})
})
It("should be able to up and down services", func() {
ns := namespaces[0]
numPods, servicePort := 3, 80
podNames1, svc1IP, err := startServeHostnameService(c, ns, "service1", servicePort, numPods)
Expect(err).NotTo(HaveOccurred())
podNames2, svc2IP, err := startServeHostnameService(c, ns, "service2", servicePort, numPods)
Expect(err).NotTo(HaveOccurred())
hosts, err := NodeSSHHosts(c)
Expect(err).NotTo(HaveOccurred())
if len(hosts) == 0 {
Failf("No ssh-able nodes")
}
host := hosts[0]
expectNoError(verifyServeHostnameServiceUp(c, host, podNames1, svc1IP, servicePort))
expectNoError(verifyServeHostnameServiceUp(c, host, podNames2, svc2IP, servicePort))
// Stop service 1 and make sure it is gone.
expectNoError(stopServeHostnameService(c, ns, "service1"))
expectNoError(verifyServeHostnameServiceDown(c, host, svc1IP, servicePort))
expectNoError(verifyServeHostnameServiceUp(c, host, podNames2, svc2IP, servicePort))
// Start another service and verify both are up.
podNames3, svc3IP, err := startServeHostnameService(c, ns, "service3", servicePort, numPods)
Expect(err).NotTo(HaveOccurred())
if svc2IP == svc3IP {
Failf("VIPs conflict: %v", svc2IP)
}
expectNoError(verifyServeHostnameServiceUp(c, host, podNames2, svc2IP, servicePort))
expectNoError(verifyServeHostnameServiceUp(c, host, podNames3, svc3IP, servicePort))
expectNoError(stopServeHostnameService(c, ns, "service2"))
expectNoError(stopServeHostnameService(c, ns, "service3"))
})
It("should work after restarting kube-proxy", func() {
SkipUnlessProviderIs("gce", "gke")
ns := namespaces[0]
numPods, servicePort := 3, 80
defer func() { expectNoError(stopServeHostnameService(c, ns, "service1")) }()
podNames1, svc1IP, err := startServeHostnameService(c, ns, "service1", servicePort, numPods)
Expect(err).NotTo(HaveOccurred())
defer func() { expectNoError(stopServeHostnameService(c, ns, "service2")) }()
podNames2, svc2IP, err := startServeHostnameService(c, ns, "service2", servicePort, numPods)
Expect(err).NotTo(HaveOccurred())
if svc1IP == svc2IP {
Failf("VIPs conflict: %v", svc1IP)
}
hosts, err := NodeSSHHosts(c)
Expect(err).NotTo(HaveOccurred())
if len(hosts) == 0 {
Failf("No ssh-able nodes")
}
host := hosts[0]
expectNoError(verifyServeHostnameServiceUp(c, host, podNames1, svc1IP, servicePort))
expectNoError(verifyServeHostnameServiceUp(c, host, podNames2, svc2IP, servicePort))
// Restart kube-proxy and verify that services are still reachable (after some time).
if err := restartKubeProxy(host); err != nil {
Failf("error restarting kube-proxy: %v", err)
}
expectNoError(verifyServeHostnameServiceUp(c, host, podNames1, svc1IP, servicePort))
expectNoError(verifyServeHostnameServiceUp(c, host, podNames2, svc2IP, servicePort))
// Remove iptable rules and make sure they come back.
By("Remove iptable rules and make sure they come back")
_, _, code, err := SSH(`
sudo iptables -t nat -F KUBE-PORTALS-HOST || true;
sudo iptables -t nat -F KUBE-PORTALS-CONTAINER || true`, host, testContext.Provider)
if err != nil || code != 0 {
Failf("couldn't remove iptable rules: %v (code %v)", err, code)
}
expectNoError(verifyServeHostnameServiceUp(c, host, podNames1, svc1IP, servicePort))
expectNoError(verifyServeHostnameServiceUp(c, host, podNames2, svc2IP, servicePort))
})
It("should work after restarting apiserver", func() {
// TODO: restartApiserver doesn't work in GKE - fix it and reenable this test.
SkipUnlessProviderIs("gce")
ns := namespaces[0]
numPods, servicePort := 3, 80
defer func() { expectNoError(stopServeHostnameService(c, ns, "service1")) }()
podNames1, svc1IP, err := startServeHostnameService(c, ns, "service1", servicePort, numPods)
hosts, err := NodeSSHHosts(c)
Expect(err).NotTo(HaveOccurred())
if len(hosts) == 0 {
Failf("No ssh-able nodes")
}
host := hosts[0]
expectNoError(verifyServeHostnameServiceUp(c, host, podNames1, svc1IP, servicePort))
// Restart apiserver
if err := restartApiserver(); err != nil {
Failf("error restarting apiserver: %v", err)
}
if err := waitForApiserverUp(c); err != nil {
Failf("error while waiting for apiserver up: %v", err)
}
expectNoError(verifyServeHostnameServiceUp(c, host, podNames1, svc1IP, servicePort))
// Create a new service and check if it's not reusing IP.
defer func() { expectNoError(stopServeHostnameService(c, ns, "service2")) }()
podNames2, svc2IP, err := startServeHostnameService(c, ns, "service2", servicePort, numPods)
if svc1IP == svc2IP {
Failf("VIPs conflict: %v", svc1IP)
}
expectNoError(verifyServeHostnameServiceUp(c, host, podNames1, svc1IP, servicePort))
expectNoError(verifyServeHostnameServiceUp(c, host, podNames2, svc2IP, servicePort))
})
It("should be able to create a functioning external load balancer", func() {
// requires ExternalLoadBalancer
SkipUnlessProviderIs("gce", "gke", "aws")
@ -1112,6 +1237,131 @@ func testNotReachable(ip string, port int) {
Expect(err).NotTo(HaveOccurred(), "Error waiting for %s", desc)
}
// Creates a replication controller that serves its hostname and a service on top of it.
func startServeHostnameService(c *client.Client, ns, name string, port, replicas int) ([]string, string, error) {
podNames := make([]string, replicas)
_, err := c.Services(ns).Create(&api.Service{
ObjectMeta: api.ObjectMeta{
Name: name,
},
Spec: api.ServiceSpec{
Ports: []api.ServicePort{{
Port: port,
TargetPort: util.NewIntOrStringFromInt(9376),
Protocol: "TCP",
}},
Selector: map[string]string{
"name": name,
},
},
})
if err != nil {
return podNames, "", err
}
var createdPods []*api.Pod
maxContainerFailures := 0
config := RCConfig{
Client: c,
Image: "gcr.io/google_containers/serve_hostname:1.1",
Name: name,
Namespace: ns,
PollInterval: 3 * time.Second,
Timeout: 30 * time.Second,
Replicas: replicas,
CreatedPods: &createdPods,
MaxContainerFailures: &maxContainerFailures,
}
err = RunRC(config)
if err != nil {
return podNames, "", err
}
if len(createdPods) != replicas {
return podNames, "", fmt.Errorf("Incorrect number of running pods: %v", len(createdPods))
}
for i := range createdPods {
podNames[i] = createdPods[i].ObjectMeta.Name
}
sort.StringSlice(podNames).Sort()
service, err := c.Services(ns).Get(name)
if err != nil {
return podNames, "", err
}
if service.Spec.ClusterIP == "" {
return podNames, "", fmt.Errorf("Service IP is blank for %v", name)
}
serviceIP := service.Spec.ClusterIP
return podNames, serviceIP, nil
}
func stopServeHostnameService(c *client.Client, ns, name string) error {
if err := DeleteRC(c, ns, name); err != nil {
return err
}
if err := c.Services(ns).Delete(name); err != nil {
return err
}
return nil
}
func verifyServeHostnameServiceUp(c *client.Client, host string, expectedPods []string, serviceIP string, servicePort int) error {
command := fmt.Sprintf(
"for i in $(seq 1 %d); do wget -q -T 1 -O - http://%s:%d || true; echo; done",
3*len(expectedPods), serviceIP, servicePort)
commands := []string{
// verify service from node
fmt.Sprintf(`set -e; %s | sort -n | uniq`, command),
// verify service from container
fmt.Sprintf(`set -e;
sudo docker pull gcr.io/google_containers/busybox > /dev/null;
sudo docker run gcr.io/google_containers/busybox sh -c '%v' | sort -n | uniq`,
command),
}
for _, cmd := range commands {
passed := false
for start := time.Now(); time.Since(start) < time.Minute; time.Sleep(5) {
stdout, _, code, err := SSH(cmd, host, testContext.Provider)
if err != nil || code != 0 {
Logf("error while SSH-ing to node: %v (code %v)", err, code)
}
pods := strings.Split(strings.TrimSpace(stdout), "\n")
sort.StringSlice(pods).Sort()
if api.Semantic.DeepEqual(pods, expectedPods) {
passed = true
break
}
Logf("Expected pods: %v, got: %v", expectedPods, pods)
}
if !passed {
return fmt.Errorf("service verification failed for:\n %s", cmd)
}
}
return nil
}
func verifyServeHostnameServiceDown(c *client.Client, host string, serviceIP string, servicePort int) error {
command := fmt.Sprintf(
"curl -s --connect-timeout 2 http://%s:%d && exit 99", serviceIP, servicePort)
for start := time.Now(); time.Since(start) < time.Minute; time.Sleep(5 * time.Second) {
_, _, code, err := SSH(command, host, testContext.Provider)
if err != nil {
Logf("error while SSH-ing to node: %v", err)
}
if code != 99 {
return nil
}
Logf("service still alive - still waiting")
}
return fmt.Errorf("waiting for service to be down timed out")
}
// Does an HTTP GET, but does not reuse TCP connections
// This masks problems where the iptables rule has changed, but we don't see it
// This is intended for relatively quick requests (status checks), so we set a short (5 seconds) timeout

View File

@ -1740,3 +1740,43 @@ func parseKVLines(output, key string) string {
}
return ""
}
func restartKubeProxy(host string) error {
// TODO: Make it work for all providers.
if !providerIs("gce", "gke", "aws") {
return fmt.Errorf("unsupported provider: %s", testContext.Provider)
}
_, _, code, err := SSH("sudo /etc/init.d/kube-proxy restart", host, testContext.Provider)
if err != nil || code != 0 {
return fmt.Errorf("couldn't restart kube-proxy: %v (code %v)", err, code)
}
return nil
}
func restartApiserver() error {
// TODO: Make it work for all providers.
if !providerIs("gce", "gke", "aws") {
return fmt.Errorf("unsupported provider: %s", testContext.Provider)
}
var command string
if providerIs("gce", "gke") {
command = "sudo docker ps | grep /kube-apiserver | cut -d ' ' -f 1 | xargs sudo docker kill"
} else {
command = "sudo /etc/init.d/kube-apiserver restart"
}
_, _, code, err := SSH(command, getMasterHost()+":22", testContext.Provider)
if err != nil || code != 0 {
return fmt.Errorf("couldn't restart apiserver: %v (code %v)", err, code)
}
return nil
}
func waitForApiserverUp(c *client.Client) error {
for start := time.Now(); time.Since(start) < time.Minute; time.Sleep(5 * time.Second) {
body, err := c.Get().AbsPath("/healthz").Do().Raw()
if err == nil && string(body) == "ok" {
return nil
}
}
return fmt.Errorf("waiting for apiserver timed out")
}