diff --git a/test/e2e/service.go b/test/e2e/service.go index 937270898e5..af498e2e13d 100644 --- a/test/e2e/service.go +++ b/test/e2e/service.go @@ -34,6 +34,7 @@ import ( "k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/intstr" + "k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/wait" ) @@ -233,14 +234,14 @@ var _ = Describe("Services", func() { } host := hosts[0] - expectNoError(verifyServeHostnameServiceUp(c, host, podNames1, svc1IP, servicePort)) - expectNoError(verifyServeHostnameServiceUp(c, host, podNames2, svc2IP, servicePort)) + expectNoError(verifyServeHostnameServiceUp(c, ns, host, podNames1, svc1IP, servicePort)) + expectNoError(verifyServeHostnameServiceUp(c, ns, host, podNames2, svc2IP, servicePort)) // Stop service 1 and make sure it is gone. expectNoError(stopServeHostnameService(c, ns, "service1")) expectNoError(verifyServeHostnameServiceDown(c, host, svc1IP, servicePort)) - expectNoError(verifyServeHostnameServiceUp(c, host, podNames2, svc2IP, servicePort)) + expectNoError(verifyServeHostnameServiceUp(c, ns, host, podNames2, svc2IP, servicePort)) // Start another service and verify both are up. podNames3, svc3IP, err := startServeHostnameService(c, ns, "service3", servicePort, numPods) @@ -250,8 +251,8 @@ var _ = Describe("Services", func() { Failf("VIPs conflict: %v", svc2IP) } - expectNoError(verifyServeHostnameServiceUp(c, host, podNames2, svc2IP, servicePort)) - expectNoError(verifyServeHostnameServiceUp(c, host, podNames3, svc3IP, servicePort)) + expectNoError(verifyServeHostnameServiceUp(c, ns, host, podNames2, svc2IP, servicePort)) + expectNoError(verifyServeHostnameServiceUp(c, ns, host, podNames3, svc3IP, servicePort)) expectNoError(stopServeHostnameService(c, ns, "service2")) expectNoError(stopServeHostnameService(c, ns, "service3")) @@ -285,15 +286,15 @@ var _ = Describe("Services", func() { } host := hosts[0] - expectNoError(verifyServeHostnameServiceUp(c, host, podNames1, svc1IP, servicePort)) - expectNoError(verifyServeHostnameServiceUp(c, host, podNames2, svc2IP, servicePort)) + expectNoError(verifyServeHostnameServiceUp(c, ns, host, podNames1, svc1IP, servicePort)) + expectNoError(verifyServeHostnameServiceUp(c, ns, host, podNames2, svc2IP, servicePort)) By("Restarting kube-proxy") if err := restartKubeProxy(host); err != nil { Failf("error restarting kube-proxy: %v", err) } - expectNoError(verifyServeHostnameServiceUp(c, host, podNames1, svc1IP, servicePort)) - expectNoError(verifyServeHostnameServiceUp(c, host, podNames2, svc2IP, servicePort)) + expectNoError(verifyServeHostnameServiceUp(c, ns, host, podNames1, svc1IP, servicePort)) + expectNoError(verifyServeHostnameServiceUp(c, ns, host, podNames2, svc2IP, servicePort)) By("Removing iptable rules") result, err := SSH(` @@ -304,8 +305,8 @@ var _ = Describe("Services", func() { LogSSHResult(result) Failf("couldn't remove iptable rules: %v", err) } - expectNoError(verifyServeHostnameServiceUp(c, host, podNames1, svc1IP, servicePort)) - expectNoError(verifyServeHostnameServiceUp(c, host, podNames2, svc2IP, servicePort)) + expectNoError(verifyServeHostnameServiceUp(c, ns, host, podNames1, svc1IP, servicePort)) + expectNoError(verifyServeHostnameServiceUp(c, ns, host, podNames2, svc2IP, servicePort)) }) It("should work after restarting apiserver", func() { @@ -326,7 +327,7 @@ var _ = Describe("Services", func() { } host := hosts[0] - expectNoError(verifyServeHostnameServiceUp(c, host, podNames1, svc1IP, servicePort)) + expectNoError(verifyServeHostnameServiceUp(c, ns, host, podNames1, svc1IP, servicePort)) // Restart apiserver if err := restartApiserver(); err != nil { @@ -335,7 +336,7 @@ var _ = Describe("Services", func() { if err := waitForApiserverUp(c); err != nil { Failf("error while waiting for apiserver up: %v", err) } - expectNoError(verifyServeHostnameServiceUp(c, host, podNames1, svc1IP, servicePort)) + expectNoError(verifyServeHostnameServiceUp(c, ns, host, podNames1, svc1IP, servicePort)) // Create a new service and check if it's not reusing IP. defer func() { expectNoError(stopServeHostnameService(c, ns, "service2")) }() @@ -345,8 +346,8 @@ var _ = Describe("Services", func() { if svc1IP == svc2IP { Failf("VIPs conflict: %v", svc1IP) } - expectNoError(verifyServeHostnameServiceUp(c, host, podNames1, svc1IP, servicePort)) - expectNoError(verifyServeHostnameServiceUp(c, host, podNames2, svc2IP, servicePort)) + expectNoError(verifyServeHostnameServiceUp(c, ns, host, podNames1, svc1IP, servicePort)) + expectNoError(verifyServeHostnameServiceUp(c, ns, host, podNames2, svc2IP, servicePort)) }) It("should be able to create a functioning NodePort service", func() { @@ -1060,6 +1061,39 @@ func validateEndpointsOrFail(c *client.Client, namespace, serviceName string, ex Failf("Timed out waiting for service %s in namespace %s to expose endpoints %v (%v elapsed)", serviceName, namespace, expectedEndpoints, serviceStartTimeout) } +// createExecPodOrFail creates a simple busybox pod in a sleep loop used as a +// vessel for kubectl exec commands. +func createExecPodOrFail(c *client.Client, ns, name string) { + Logf("Creating new exec pod") + immediate := int64(0) + pod := &api.Pod{ + ObjectMeta: api.ObjectMeta{ + Name: name, + Namespace: ns, + }, + Spec: api.PodSpec{ + TerminationGracePeriodSeconds: &immediate, + Containers: []api.Container{ + { + Name: "exec", + Image: "gcr.io/google_containers/busybox", + Command: []string{"sh", "-c", "while true; do sleep 5; done"}, + }, + }, + }, + } + _, err := c.Pods(ns).Create(pod) + Expect(err).NotTo(HaveOccurred()) + err = wait.PollImmediate(poll, 5*time.Minute, func() (bool, error) { + retrievedPod, err := c.Pods(pod.Namespace).Get(pod.Name) + if err != nil { + return false, nil + } + return retrievedPod.Status.Phase == api.PodRunning, nil + }) + Expect(err).NotTo(HaveOccurred()) +} + func createPodOrFail(c *client.Client, ns, name string, labels map[string]string, containerPorts []api.ContainerPort) { By(fmt.Sprintf("creating pod %s in namespace %s", name, ns)) pod := &api.Pod{ @@ -1293,40 +1327,69 @@ func stopServeHostnameService(c *client.Client, ns, name string) error { return nil } -func verifyServeHostnameServiceUp(c *client.Client, host string, expectedPods []string, serviceIP string, servicePort int) error { +// verifyServeHostnameServiceUp wgets the given serviceIP:servicePort from the +// given host and from within a pod. The host is expected to be an SSH-able node +// in the cluster. Each pod in the service is expected to echo its name. These +// names are compared with the given expectedPods list after a sort | uniq. +func verifyServeHostnameServiceUp(c *client.Client, ns, host string, expectedPods []string, serviceIP string, servicePort int) error { + execPodName := "execpod" + createExecPodOrFail(c, ns, execPodName) + defer func() { + deletePodOrFail(c, ns, execPodName) + }() command := fmt.Sprintf( "for i in $(seq 1 %d); do wget -q -T 1 -O - http://%s:%d || true; echo; done", 3*len(expectedPods), serviceIP, servicePort) - commands := []string{ + commands := []func() string{ // verify service from node - fmt.Sprintf(`set -e; %s | sort -n | uniq`, command), - // verify service from container - fmt.Sprintf(`set -e; - sudo docker pull gcr.io/google_containers/busybox > /dev/null; - sudo docker run gcr.io/google_containers/busybox sh -c '%v' | sort -n | uniq`, - command), - } - - By(fmt.Sprintf("verifying service has %d reachable backends", len(expectedPods))) - for _, cmd := range commands { - passed := false - for start := time.Now(); time.Since(start) < time.Minute; time.Sleep(5 * time.Second) { + func() string { + cmd := fmt.Sprintf(`set -e; %s`, command) + Logf("Executing cmd %v on host %v", cmd, host) result, err := SSH(cmd, host, testContext.Provider) if err != nil || result.Code != 0 { LogSSHResult(result) Logf("error while SSH-ing to node: %v", err) } - pods := strings.Split(strings.TrimSpace(result.Stdout), "\n") - sort.StringSlice(pods).Sort() - if api.Semantic.DeepEqual(pods, expectedPods) { + return result.Stdout + }, + // verify service from pod + func() string { + Logf("Executing cmd %v in pod %v/%v", command, ns, execPodName) + // TODO: Use exec-over-http via the netexec pod instead of kubectl exec. + output, err := RunHostCmd(ns, execPodName, command) + if err != nil { + Logf("error while kubectl execing %v in pod %v/%v: %v\nOutput: %v", command, ns, execPodName, err, output) + } + return output + }, + } + sort.StringSlice(expectedPods).Sort() + By(fmt.Sprintf("verifying service has %d reachable backends", len(expectedPods))) + for _, cmdFunc := range commands { + passed := false + pods := []string{} + // Retry cmdFunc for upto 5 minutes. + // TODO: make this stricter. Ideally hitting a Service with n pods n + // times should round robing to each pod, and one pass should suffice. + for start := time.Now(); time.Since(start) < time.Minute; time.Sleep(5 * time.Second) { + pods = strings.Split(strings.TrimSpace(cmdFunc()), "\n") + // Uniq pods before the sort because inserting them into a set + // (which is implemented using dicts) can re-order them. + uniquePods := sets.String{} + for _, name := range pods { + uniquePods.Insert(name) + } + sortedPods := uniquePods.List() + sort.StringSlice(sortedPods).Sort() + if api.Semantic.DeepEqual(sortedPods, expectedPods) { passed = true break } - Logf("Waiting for expected pods for %s: %v, got: %v", serviceIP, expectedPods, pods) + Logf("Waiting for expected pods for %s: %v, got: %v", serviceIP, expectedPods, sortedPods) } if !passed { - return fmt.Errorf("service verification failed for:\n %s", cmd) + return fmt.Errorf("service verification failed for:\n %s, expected to retrieve pods %v, only retrieved %v", serviceIP, expectedPods, pods) } } return nil