From 632a0a81d40dc1bab5ecc47b6004ec73a6c57e16 Mon Sep 17 00:00:00 2001 From: Jay Vyas Date: Thu, 31 Mar 2016 14:45:08 -0400 Subject: [PATCH] Cluster verification framework supporting declarative definition and iteration against pod spectrum - rebase: ForEach only on Running pods - add waitFor step in guestbook describe and wrapper - simplify logs in polling, make panic immediate, give rolluped stats in the logs. Improve logging for failure on ForEach --- test/e2e/examples.go | 90 ++++++++--------- test/e2e/framework/framework.go | 173 ++++++++++++++++++++++++++++++++ test/e2e/kubectl.go | 34 ++++++- 3 files changed, 243 insertions(+), 54 deletions(-) diff --git a/test/e2e/examples.go b/test/e2e/examples.go index 68f6d75626b..047ca0281d4 100644 --- a/test/e2e/examples.go +++ b/test/e2e/examples.go @@ -27,7 +27,6 @@ import ( "k8s.io/kubernetes/pkg/api" client "k8s.io/kubernetes/pkg/client/unversioned" - "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/test/e2e/framework" . "github.com/onsi/ginkgo" @@ -40,6 +39,14 @@ const ( var _ = framework.KubeDescribe("[Feature:Example]", func() { f := framework.NewDefaultFramework("examples") + // Customized ForEach wrapper for this test. + forEachPod := func(selectorKey string, selectorValue string, fn func(api.Pod)) { + f.NewClusterVerification( + framework.PodStateVerification{ + Selectors: map[string]string{selectorKey: selectorValue}, + ValidPhases: []api.PodPhase{api.PodRunning}, + }).ForEach(fn) + } var c *client.Client var ns string BeforeEach(func() { @@ -85,13 +92,13 @@ var _ = framework.KubeDescribe("[Feature:Example]", func() { By("checking up the services") checkAllLogs := func() { - forEachPod(c, ns, "name", "redis", func(pod api.Pod) { + forEachPod("name", "redis", func(pod api.Pod) { if pod.Name != bootstrapPodName { _, err := framework.LookForStringInLog(ns, pod.Name, "redis", expectedOnServer, serverStartTimeout) Expect(err).NotTo(HaveOccurred()) } }) - forEachPod(c, ns, "name", "redis-sentinel", func(pod api.Pod) { + forEachPod("name", "redis-sentinel", func(pod api.Pod) { if pod.Name != bootstrapPodName { _, err := framework.LookForStringInLog(ns, pod.Name, "sentinel", expectedOnSentinel, serverStartTimeout) Expect(err).NotTo(HaveOccurred()) @@ -124,7 +131,7 @@ var _ = framework.KubeDescribe("[Feature:Example]", func() { By("starting rabbitmq") framework.RunKubectlOrDie("create", "-f", rabbitmqServiceYaml, nsFlag) framework.RunKubectlOrDie("create", "-f", rabbitmqControllerYaml, nsFlag) - forEachPod(c, ns, "component", "rabbitmq", func(pod api.Pod) { + forEachPod("component", "rabbitmq", func(pod api.Pod) { _, err := framework.LookForStringInLog(ns, pod.Name, "rabbitmq", "Server startup complete", serverStartTimeout) Expect(err).NotTo(HaveOccurred()) }) @@ -133,7 +140,7 @@ var _ = framework.KubeDescribe("[Feature:Example]", func() { By("starting celery") framework.RunKubectlOrDie("create", "-f", celeryControllerYaml, nsFlag) - forEachPod(c, ns, "component", "celery", func(pod api.Pod) { + forEachPod("component", "celery", func(pod api.Pod) { _, err := framework.LookForStringInFile(ns, pod.Name, "celery", "/data/celery.log", " ready.", serverStartTimeout) Expect(err).NotTo(HaveOccurred()) }) @@ -141,14 +148,16 @@ var _ = framework.KubeDescribe("[Feature:Example]", func() { By("starting flower") framework.RunKubectlOrDie("create", "-f", flowerServiceYaml, nsFlag) framework.RunKubectlOrDie("create", "-f", flowerControllerYaml, nsFlag) - forEachPod(c, ns, "component", "flower", func(pod api.Pod) { - // Do nothing. just wait for it to be up and running. + forEachPod("component", "flower", func(pod api.Pod) { + + }) + forEachPod("component", "flower", func(pod api.Pod) { + content, err := makeHttpRequestToService(c, ns, "flower-service", "/", framework.EndpointRegisterTimeout) + Expect(err).NotTo(HaveOccurred()) + if !strings.Contains(content, "Celery Flower") { + framework.Failf("Flower HTTP request failed") + } }) - content, err := makeHttpRequestToService(c, ns, "flower-service", "/", framework.EndpointRegisterTimeout) - Expect(err).NotTo(HaveOccurred()) - if !strings.Contains(content, "Celery Flower") { - framework.Failf("Flower HTTP request failed") - } }) }) @@ -172,7 +181,7 @@ var _ = framework.KubeDescribe("[Feature:Example]", func() { framework.Logf("Now polling for Master startup...") // Only one master pod: But its a natural way to look up pod names. - forEachPod(c, ns, "component", "spark-master", func(pod api.Pod) { + forEachPod("component", "spark-master", func(pod api.Pod) { framework.Logf("Now waiting for master to startup in %v", pod.Name) _, err := framework.LookForStringInLog(ns, pod.Name, "spark-master", "Starting Spark master at", serverStartTimeout) Expect(err).NotTo(HaveOccurred()) @@ -181,6 +190,12 @@ var _ = framework.KubeDescribe("[Feature:Example]", func() { By("waiting for master endpoint") err := framework.WaitForEndpoint(c, ns, "spark-master") Expect(err).NotTo(HaveOccurred()) + forEachPod("component", "spark-master", func(pod api.Pod) { + _, maErr := framework.LookForStringInLog(f.Namespace.Name, pod.Name, "spark-master", "Starting Spark master at", serverStartTimeout) + if maErr != nil { + framework.Failf("Didn't find target string. error:", maErr) + } + }) } worker := func() { By("starting workers") @@ -191,10 +206,13 @@ var _ = framework.KubeDescribe("[Feature:Example]", func() { // framework.ScaleRC(c, ns, "spark-worker-controller", 2, true) framework.Logf("Now polling for worker startup...") - forEachPod(c, ns, "component", "spark-worker", func(pod api.Pod) { - _, err := framework.LookForStringInLog(ns, pod.Name, "spark-worker", "Successfully registered with master", serverStartTimeout) - Expect(err).NotTo(HaveOccurred()) - }) + // ScaleRC(c, ns, "spark-worker-controller", 2, true) + framework.Logf("Now polling for worker startup...") + forEachPod("component", "spark-worker", + func(pod api.Pod) { + _, slaveErr := framework.LookForStringInLog(ns, pod.Name, "spark-worker", "Successfully registered with master", serverStartTimeout) + Expect(slaveErr).NotTo(HaveOccurred()) + }) } // Run the worker verification after we turn up the master. defer worker() @@ -231,7 +249,7 @@ var _ = framework.KubeDescribe("[Feature:Example]", func() { // Create an RC with n nodes in it. Each node will then be verified. By("Creating a Cassandra RC") framework.RunKubectlOrDie("create", "-f", controllerYaml, nsFlag) - forEachPod(c, ns, "app", "cassandra", func(pod api.Pod) { + forEachPod("app", "cassandra", func(pod api.Pod) { framework.Logf("Verifying pod %v ", pod.Name) _, err = framework.LookForStringInLog(ns, pod.Name, "cassandra", "Listening for thrift clients", serverStartTimeout) Expect(err).NotTo(HaveOccurred()) @@ -241,7 +259,7 @@ var _ = framework.KubeDescribe("[Feature:Example]", func() { By("Finding each node in the nodetool status lines") output := framework.RunKubectlOrDie("exec", "cassandra", nsFlag, "--", "nodetool", "status") - forEachPod(c, ns, "app", "cassandra", func(pod api.Pod) { + forEachPod("app", "cassandra", func(pod api.Pod) { if !strings.Contains(output, pod.Status.PodIP) { framework.Failf("Pod ip %s not found in nodetool status", pod.Status.PodIP) } @@ -285,7 +303,7 @@ var _ = framework.KubeDescribe("[Feature:Example]", func() { By("starting workers") framework.RunKubectlOrDie("create", "-f", workerControllerJson, nsFlag) - forEachPod(c, ns, "name", "storm-worker", func(pod api.Pod) { + forEachPod("name", "storm-worker", func(pod api.Pod) { //do nothing, just wait for the pod to be running }) // TODO: Add logging configuration to nimbus & workers images and then @@ -412,7 +430,7 @@ var _ = framework.KubeDescribe("[Feature:Example]", func() { framework.RunKubectlOrDie("create", "-f", driverServiceYaml, nsFlag) framework.RunKubectlOrDie("create", "-f", rethinkDbControllerYaml, nsFlag) checkDbInstances := func() { - forEachPod(c, ns, "db", "rethinkdb", func(pod api.Pod) { + forEachPod("db", "rethinkdb", func(pod api.Pod) { _, err := framework.LookForStringInLog(ns, pod.Name, "rethinkdb", "Server ready", serverStartTimeout) Expect(err).NotTo(HaveOccurred()) }) @@ -451,7 +469,7 @@ var _ = framework.KubeDescribe("[Feature:Example]", func() { By("starting hazelcast") framework.RunKubectlOrDie("create", "-f", serviceYaml, nsFlag) framework.RunKubectlOrDie("create", "-f", controllerYaml, nsFlag) - forEachPod(c, ns, "name", "hazelcast", func(pod api.Pod) { + forEachPod("name", "hazelcast", func(pod api.Pod) { _, err := framework.LookForStringInLog(ns, pod.Name, "hazelcast", "Members [1]", serverStartTimeout) Expect(err).NotTo(HaveOccurred()) _, err = framework.LookForStringInLog(ns, pod.Name, "hazelcast", "is STARTED", serverStartTimeout) @@ -463,7 +481,7 @@ var _ = framework.KubeDescribe("[Feature:Example]", func() { By("scaling hazelcast") framework.ScaleRC(c, ns, "hazelcast", 2, true) - forEachPod(c, ns, "name", "hazelcast", func(pod api.Pod) { + forEachPod("name", "hazelcast", func(pod api.Pod) { _, err := framework.LookForStringInLog(ns, pod.Name, "hazelcast", "Members [2]", serverStartTimeout) Expect(err).NotTo(HaveOccurred()) }) @@ -501,29 +519,3 @@ func prepareResourceWithReplacedString(inputFile, old, new string) string { podYaml := strings.Replace(string(data), old, new, 1) return podYaml } - -func forEachPod(c *client.Client, ns, selectorKey, selectorValue string, fn func(api.Pod)) { - pods := []*api.Pod{} - for t := time.Now(); time.Since(t) < framework.PodListTimeout; time.Sleep(framework.Poll) { - selector := labels.SelectorFromSet(labels.Set(map[string]string{selectorKey: selectorValue})) - options := api.ListOptions{LabelSelector: selector} - podList, err := c.Pods(ns).List(options) - Expect(err).NotTo(HaveOccurred()) - for _, pod := range podList.Items { - if pod.Status.Phase == api.PodPending || pod.Status.Phase == api.PodRunning { - pods = append(pods, &pod) - } - } - if len(pods) > 0 { - break - } - } - if pods == nil || len(pods) == 0 { - framework.Failf("No pods found") - } - for _, pod := range pods { - err := framework.WaitForPodRunningInNamespace(c, pod.Name, ns) - Expect(err).NotTo(HaveOccurred()) - fn(*pod) - } -} diff --git a/test/e2e/framework/framework.go b/test/e2e/framework/framework.go index 97794eedec2..7cb857d2484 100644 --- a/test/e2e/framework/framework.go +++ b/test/e2e/framework/framework.go @@ -36,6 +36,8 @@ import ( . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" + "k8s.io/kubernetes/pkg/labels" + "k8s.io/kubernetes/pkg/util/wait" ) const ( @@ -423,3 +425,174 @@ func kubectlExec(namespace string, podName, containerName string, args ...string func KubeDescribe(text string, body func()) bool { return Describe("[k8s.io] "+text, body) } + +// PodStateVerification represents a verification of pod state. +// Any time you have a set of pods that you want to operate against or query, +// this struct can be used to declaratively identify those pods. +type PodStateVerification struct { + // Optional: only pods that have k=v labels will pass this filter. + Selectors map[string]string + + // Required: The phases which are valid for your pod. + ValidPhases []api.PodPhase + + // Optional: only pods passing this function will pass the filter + // Verify a pod. + // As an optimization, in addition to specfying filter (boolean), + // this function allows specifying an error as well. + // The error indicates that the polling of the pod spectrum should stop. + Verify func(api.Pod) (bool, error) + + // Optional: only pods with this name will pass the filter. + PodName string +} + +type ClusterVerification struct { + client *client.Client + namespace *api.Namespace // pointer rather than string, since ns isn't created until before each. + podState PodStateVerification +} + +func (f *Framework) NewClusterVerification(filter PodStateVerification) *ClusterVerification { + return &ClusterVerification{ + f.Client, + f.Namespace, + filter, + } +} + +func passesPodNameFilter(pod api.Pod, name string) bool { + return name == "" || strings.Contains(pod.Name, name) +} + +func passesVerifyFilter(pod api.Pod, verify func(p api.Pod) (bool, error)) (bool, error) { + if verify == nil { + return true, nil + } else { + verified, err := verify(pod) + // If an error is returned, by definition, pod verification fails + if err != nil { + return false, err + } else { + return verified, nil + } + } +} + +func passesPhasesFilter(pod api.Pod, validPhases []api.PodPhase) bool { + passesPhaseFilter := false + for _, phase := range validPhases { + if pod.Status.Phase == phase { + passesPhaseFilter = true + } + } + return passesPhaseFilter +} + +// filterLabels returns a list of pods which have labels. +func filterLabels(selectors map[string]string, cli *client.Client, ns string) (*api.PodList, error) { + var err error + var selector labels.Selector + var pl *api.PodList + // List pods based on selectors. This might be a tiny optimization rather then filtering + // everything manually. + if len(selectors) > 0 { + selector = labels.SelectorFromSet(labels.Set(selectors)) + options := api.ListOptions{LabelSelector: selector} + pl, err = cli.Pods(ns).List(options) + } else { + pl, err = cli.Pods(ns).List(api.ListOptions{}) + } + return pl, err +} + +// filter filters pods which pass a filter. It can be used to compose +// the more useful abstractions like ForEach, WaitFor, and so on, which +// can be used directly by tests. +func (p *PodStateVerification) filter(c *client.Client, namespace *api.Namespace) ([]api.Pod, error) { + if len(p.ValidPhases) == 0 || namespace == nil { + panic(fmt.Errorf("Need to specify a valid pod phases (%v) and namespace (%v). ", p.ValidPhases, namespace)) + } + + ns := namespace.Name + pl, err := filterLabels(p.Selectors, c, ns) // Build an api.PodList to operate against. + Logf("Selector matched %v pods for %v", len(pl.Items), p.Selectors) + if len(pl.Items) == 0 || err != nil { + return pl.Items, err + } + + unfilteredPods := pl.Items + filteredPods := []api.Pod{} +ReturnPodsSoFar: + // Next: Pod must match at least one of the states that the user specified + for _, pod := range unfilteredPods { + if !(passesPhasesFilter(pod, p.ValidPhases) && passesPodNameFilter(pod, p.PodName)) { + continue + } + passesVerify, err := passesVerifyFilter(pod, p.Verify) + if err != nil { + Logf("Error detected on %v : %v !", pod.Name, err) + break ReturnPodsSoFar + } + if passesVerify { + filteredPods = append(filteredPods, pod) + } + } + return filteredPods, err +} + +// WaitFor waits for some minimum number of pods to be verified, according to the PodStateVerification +// definition. +func (cl *ClusterVerification) WaitFor(atLeast int, timeout time.Duration) ([]api.Pod, error) { + pods := []api.Pod{} + var returnedErr error + + err := wait.Poll(1*time.Second, timeout, func() (bool, error) { + pods, returnedErr = cl.podState.filter(cl.client, cl.namespace) + + // Failure + if returnedErr != nil { + Logf("Cutting polling short: We got an error from the pod filtering layer.") + // stop polling if the pod filtering returns an error. that should never happen. + // it indicates, for example, that the client is broken or something non-pod related. + return false, returnedErr + } + Logf("Found %v / %v", len(pods), atLeast) + + // Success + if len(pods) >= atLeast { + return true, nil + } + // Keep trying... + return false, nil + }) + Logf("WaitFor completed. Pods found = %v out of %v", timeout, len(pods), atLeast) + return pods, err +} + +// WaitForOrFail provides a shorthand WaitFor with failure as an option if anything goes wrong. +func (cl *ClusterVerification) WaitForOrFail(atLeast int, timeout time.Duration) { + pods, err := cl.WaitFor(atLeast, timeout) + if err != nil || len(pods) < atLeast { + Failf("Verified %v of %v pods , error : %v", len(pods), atLeast, err) + } +} + +// ForEach runs a function against every verifiable pod. Be warned that this doesn't wait for "n" pods to verifiy, +// so it may return very quickly if you have strict pod state requirements. +// +// For example, if you require at least 5 pods to be running before your test will pass, +// its smart to first call "clusterVerification.WaitFor(5)" before you call clusterVerification.ForEach. +func (cl *ClusterVerification) ForEach(podFunc func(api.Pod)) error { + pods, err := cl.podState.filter(cl.client, cl.namespace) + if err == nil { + Logf("ForEach: Found %v pods from the filter. Now looping through them.", len(pods)) + for _, p := range pods { + podFunc(p) + } + } else { + Logf("ForEach: Something went wrong when filtering pods to execute against: %v", err) + } + + return err +} diff --git a/test/e2e/kubectl.go b/test/e2e/kubectl.go index ed65d4593be..6d16362a3ca 100644 --- a/test/e2e/kubectl.go +++ b/test/e2e/kubectl.go @@ -115,6 +115,25 @@ var ( var _ = framework.KubeDescribe("Kubectl client", func() { defer GinkgoRecover() f := framework.NewDefaultFramework("kubectl") + + // Reustable cluster state function. This won't be adversly affected by lazy initialization of framework. + clusterState := func() *framework.ClusterVerification { + return f.NewClusterVerification( + framework.PodStateVerification{ + Selectors: map[string]string{"app": "redis"}, + ValidPhases: []api.PodPhase{api.PodRunning /*api.PodPending*/}, + }) + } + // Customized Wait / ForEach wrapper for this test. These demonstrate the + // idiomatic way to wrap the ClusterVerification structs for syntactic sugar in large + // test files. + waitFor := func(atLeast int) { + // 60 seconds can be flakey for some of the containers. + clusterState().WaitFor(atLeast, 90*time.Second) + } + forEachPod := func(podFunc func(p api.Pod)) { + clusterState().ForEach(podFunc) + } var c *client.Client var ns string BeforeEach(func() { @@ -588,8 +607,10 @@ var _ = framework.KubeDescribe("Kubectl client", func() { framework.RunKubectlOrDie("create", "-f", controllerJson, nsFlag) framework.RunKubectlOrDie("create", "-f", serviceJson, nsFlag) + // Wait for the redis pods to come online... + waitFor(1) // Pod - forEachPod(c, ns, "app", "redis", func(pod api.Pod) { + forEachPod(func(pod api.Pod) { output := framework.RunKubectlOrDie("describe", "pod", pod.Name, nsFlag) requiredStrings := [][]string{ {"Name:", "redis-master-"}, @@ -684,8 +705,11 @@ var _ = framework.KubeDescribe("Kubectl client", func() { redisPort := 6379 By("creating Redis RC") + + framework.Logf("namespace %v", ns) framework.RunKubectlOrDie("create", "-f", controllerJson, nsFlag) - forEachPod(c, ns, "app", "redis", func(pod api.Pod) { + forEachPod(func(pod api.Pod) { + framework.Logf("wait on %v ", ns) framework.LookForStringInLog(ns, pod.Name, "redis-master", "The server is now ready to accept connections", framework.PodStartTimeout) }) validateService := func(name string, servicePort int, timeout time.Duration) { @@ -799,7 +823,7 @@ var _ = framework.KubeDescribe("Kubectl client", func() { It("should be able to retrieve and filter logs [Conformance]", func() { framework.SkipUnlessServerVersionGTE(extendedPodLogFilterVersion, c) - forEachPod(c, ns, "app", "redis", func(pod api.Pod) { + forEachPod(func(pod api.Pod) { By("checking for a matching strings") _, err := framework.LookForStringInLog(ns, pod.Name, containerName, "The server is now ready to accept connections", framework.PodStartTimeout) Expect(err).NotTo(HaveOccurred()) @@ -850,12 +874,12 @@ var _ = framework.KubeDescribe("Kubectl client", func() { By("creating Redis RC") framework.RunKubectlOrDie("create", "-f", controllerJson, nsFlag) By("patching all pods") - forEachPod(c, ns, "app", "redis", func(pod api.Pod) { + forEachPod(func(pod api.Pod) { framework.RunKubectlOrDie("patch", "pod", pod.Name, nsFlag, "-p", "{\"metadata\":{\"annotations\":{\"x\":\"y\"}}}") }) By("checking annotations") - forEachPod(c, ns, "app", "redis", func(pod api.Pod) { + forEachPod(func(pod api.Pod) { found := false for key, val := range pod.Annotations { if key == "x" && val == "y" {