diff --git a/test/e2e/examples.go b/test/e2e/examples.go
index d449bfc1970..c2d68d67c8b 100644
--- a/test/e2e/examples.go
+++ b/test/e2e/examples.go
@@ -27,7 +27,6 @@ import (
"k8s.io/kubernetes/pkg/api"
client "k8s.io/kubernetes/pkg/client/unversioned"
- "k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/test/e2e/framework"
. "github.com/onsi/ginkgo"
@@ -40,6 +39,14 @@ const (
var _ = framework.KubeDescribe("[Feature:Example]", func() {
f := framework.NewDefaultFramework("examples")
+ // Customized ForEach wrapper for this test.
+ forEachPod := func(selectorKey string, selectorValue string, fn func(api.Pod)) {
+ f.NewClusterVerification(
+ framework.PodStateVerification{
+ Selectors: map[string]string{selectorKey: selectorValue},
+ ValidPhases: []api.PodPhase{api.PodRunning},
+ }).ForEach(fn)
+ }
var c *client.Client
var ns string
BeforeEach(func() {
@@ -85,13 +92,13 @@ var _ = framework.KubeDescribe("[Feature:Example]", func() {
By("checking up the services")
checkAllLogs := func() {
- forEachPod(c, ns, "name", "redis", func(pod api.Pod) {
+ forEachPod("name", "redis", func(pod api.Pod) {
if pod.Name != bootstrapPodName {
_, err := framework.LookForStringInLog(ns, pod.Name, "redis", expectedOnServer, serverStartTimeout)
Expect(err).NotTo(HaveOccurred())
}
})
- forEachPod(c, ns, "name", "redis-sentinel", func(pod api.Pod) {
+ forEachPod("name", "redis-sentinel", func(pod api.Pod) {
if pod.Name != bootstrapPodName {
_, err := framework.LookForStringInLog(ns, pod.Name, "sentinel", expectedOnSentinel, serverStartTimeout)
Expect(err).NotTo(HaveOccurred())
@@ -124,7 +131,7 @@ var _ = framework.KubeDescribe("[Feature:Example]", func() {
By("starting rabbitmq")
framework.RunKubectlOrDie("create", "-f", rabbitmqServiceYaml, nsFlag)
framework.RunKubectlOrDie("create", "-f", rabbitmqControllerYaml, nsFlag)
- forEachPod(c, ns, "component", "rabbitmq", func(pod api.Pod) {
+ forEachPod("component", "rabbitmq", func(pod api.Pod) {
_, err := framework.LookForStringInLog(ns, pod.Name, "rabbitmq", "Server startup complete", serverStartTimeout)
Expect(err).NotTo(HaveOccurred())
})
@@ -133,7 +140,7 @@ var _ = framework.KubeDescribe("[Feature:Example]", func() {
By("starting celery")
framework.RunKubectlOrDie("create", "-f", celeryControllerYaml, nsFlag)
- forEachPod(c, ns, "component", "celery", func(pod api.Pod) {
+ forEachPod("component", "celery", func(pod api.Pod) {
_, err := framework.LookForStringInFile(ns, pod.Name, "celery", "/data/celery.log", " ready.", serverStartTimeout)
Expect(err).NotTo(HaveOccurred())
})
@@ -141,14 +148,16 @@ var _ = framework.KubeDescribe("[Feature:Example]", func() {
By("starting flower")
framework.RunKubectlOrDie("create", "-f", flowerServiceYaml, nsFlag)
framework.RunKubectlOrDie("create", "-f", flowerControllerYaml, nsFlag)
- forEachPod(c, ns, "component", "flower", func(pod api.Pod) {
- // Do nothing. just wait for it to be up and running.
+ forEachPod("component", "flower", func(pod api.Pod) {
+
+ })
+ forEachPod("component", "flower", func(pod api.Pod) {
+ content, err := makeHttpRequestToService(c, ns, "flower-service", "/", framework.EndpointRegisterTimeout)
+ Expect(err).NotTo(HaveOccurred())
+ if !strings.Contains(content, "
Celery Flower") {
+ framework.Failf("Flower HTTP request failed")
+ }
})
- content, err := makeHttpRequestToService(c, ns, "flower-service", "/", framework.EndpointRegisterTimeout)
- Expect(err).NotTo(HaveOccurred())
- if !strings.Contains(content, "Celery Flower") {
- framework.Failf("Flower HTTP request failed")
- }
})
})
@@ -172,7 +181,7 @@ var _ = framework.KubeDescribe("[Feature:Example]", func() {
framework.Logf("Now polling for Master startup...")
// Only one master pod: But its a natural way to look up pod names.
- forEachPod(c, ns, "component", "spark-master", func(pod api.Pod) {
+ forEachPod("component", "spark-master", func(pod api.Pod) {
framework.Logf("Now waiting for master to startup in %v", pod.Name)
_, err := framework.LookForStringInLog(ns, pod.Name, "spark-master", "Starting Spark master at", serverStartTimeout)
Expect(err).NotTo(HaveOccurred())
@@ -181,6 +190,12 @@ var _ = framework.KubeDescribe("[Feature:Example]", func() {
By("waiting for master endpoint")
err := framework.WaitForEndpoint(c, ns, "spark-master")
Expect(err).NotTo(HaveOccurred())
+ forEachPod("component", "spark-master", func(pod api.Pod) {
+ _, maErr := framework.LookForStringInLog(f.Namespace.Name, pod.Name, "spark-master", "Starting Spark master at", serverStartTimeout)
+ if maErr != nil {
+ framework.Failf("Didn't find target string. error:", maErr)
+ }
+ })
}
worker := func() {
By("starting workers")
@@ -191,10 +206,13 @@ var _ = framework.KubeDescribe("[Feature:Example]", func() {
// framework.ScaleRC(c, ns, "spark-worker-controller", 2, true)
framework.Logf("Now polling for worker startup...")
- forEachPod(c, ns, "component", "spark-worker", func(pod api.Pod) {
- _, err := framework.LookForStringInLog(ns, pod.Name, "spark-worker", "Successfully registered with master", serverStartTimeout)
- Expect(err).NotTo(HaveOccurred())
- })
+ // ScaleRC(c, ns, "spark-worker-controller", 2, true)
+ framework.Logf("Now polling for worker startup...")
+ forEachPod("component", "spark-worker",
+ func(pod api.Pod) {
+ _, slaveErr := framework.LookForStringInLog(ns, pod.Name, "spark-worker", "Successfully registered with master", serverStartTimeout)
+ Expect(slaveErr).NotTo(HaveOccurred())
+ })
}
// Run the worker verification after we turn up the master.
defer worker()
@@ -221,7 +239,7 @@ var _ = framework.KubeDescribe("[Feature:Example]", func() {
// Create an RC with n nodes in it. Each node will then be verified.
By("Creating a Cassandra RC")
framework.RunKubectlOrDie("create", "-f", controllerYaml, nsFlag)
- forEachPod(c, ns, "app", "cassandra", func(pod api.Pod) {
+ forEachPod("app", "cassandra", func(pod api.Pod) {
framework.Logf("Verifying pod %v ", pod.Name)
_, err = framework.LookForStringInLog(ns, pod.Name, "cassandra", "Listening for thrift clients", serverStartTimeout)
Expect(err).NotTo(HaveOccurred())
@@ -231,7 +249,7 @@ var _ = framework.KubeDescribe("[Feature:Example]", func() {
By("Finding each node in the nodetool status lines")
output := framework.RunKubectlOrDie("exec", "cassandra", nsFlag, "--", "nodetool", "status")
- forEachPod(c, ns, "app", "cassandra", func(pod api.Pod) {
+ forEachPod("app", "cassandra", func(pod api.Pod) {
if !strings.Contains(output, pod.Status.PodIP) {
framework.Failf("Pod ip %s not found in nodetool status", pod.Status.PodIP)
}
@@ -275,7 +293,7 @@ var _ = framework.KubeDescribe("[Feature:Example]", func() {
By("starting workers")
framework.RunKubectlOrDie("create", "-f", workerControllerJson, nsFlag)
- forEachPod(c, ns, "name", "storm-worker", func(pod api.Pod) {
+ forEachPod("name", "storm-worker", func(pod api.Pod) {
//do nothing, just wait for the pod to be running
})
// TODO: Add logging configuration to nimbus & workers images and then
@@ -402,7 +420,7 @@ var _ = framework.KubeDescribe("[Feature:Example]", func() {
framework.RunKubectlOrDie("create", "-f", driverServiceYaml, nsFlag)
framework.RunKubectlOrDie("create", "-f", rethinkDbControllerYaml, nsFlag)
checkDbInstances := func() {
- forEachPod(c, ns, "db", "rethinkdb", func(pod api.Pod) {
+ forEachPod("db", "rethinkdb", func(pod api.Pod) {
_, err := framework.LookForStringInLog(ns, pod.Name, "rethinkdb", "Server ready", serverStartTimeout)
Expect(err).NotTo(HaveOccurred())
})
@@ -441,7 +459,7 @@ var _ = framework.KubeDescribe("[Feature:Example]", func() {
By("starting hazelcast")
framework.RunKubectlOrDie("create", "-f", serviceYaml, nsFlag)
framework.RunKubectlOrDie("create", "-f", controllerYaml, nsFlag)
- forEachPod(c, ns, "name", "hazelcast", func(pod api.Pod) {
+ forEachPod("name", "hazelcast", func(pod api.Pod) {
_, err := framework.LookForStringInLog(ns, pod.Name, "hazelcast", "Members [1]", serverStartTimeout)
Expect(err).NotTo(HaveOccurred())
_, err = framework.LookForStringInLog(ns, pod.Name, "hazelcast", "is STARTED", serverStartTimeout)
@@ -453,7 +471,7 @@ var _ = framework.KubeDescribe("[Feature:Example]", func() {
By("scaling hazelcast")
framework.ScaleRC(c, ns, "hazelcast", 2, true)
- forEachPod(c, ns, "name", "hazelcast", func(pod api.Pod) {
+ forEachPod("name", "hazelcast", func(pod api.Pod) {
_, err := framework.LookForStringInLog(ns, pod.Name, "hazelcast", "Members [2]", serverStartTimeout)
Expect(err).NotTo(HaveOccurred())
})
@@ -491,29 +509,3 @@ func prepareResourceWithReplacedString(inputFile, old, new string) string {
podYaml := strings.Replace(string(data), old, new, 1)
return podYaml
}
-
-func forEachPod(c *client.Client, ns, selectorKey, selectorValue string, fn func(api.Pod)) {
- pods := []*api.Pod{}
- for t := time.Now(); time.Since(t) < framework.PodListTimeout; time.Sleep(framework.Poll) {
- selector := labels.SelectorFromSet(labels.Set(map[string]string{selectorKey: selectorValue}))
- options := api.ListOptions{LabelSelector: selector}
- podList, err := c.Pods(ns).List(options)
- Expect(err).NotTo(HaveOccurred())
- for _, pod := range podList.Items {
- if pod.Status.Phase == api.PodPending || pod.Status.Phase == api.PodRunning {
- pods = append(pods, &pod)
- }
- }
- if len(pods) > 0 {
- break
- }
- }
- if pods == nil || len(pods) == 0 {
- framework.Failf("No pods found")
- }
- for _, pod := range pods {
- err := framework.WaitForPodRunningInNamespace(c, pod.Name, ns)
- Expect(err).NotTo(HaveOccurred())
- fn(*pod)
- }
-}
diff --git a/test/e2e/framework/framework.go b/test/e2e/framework/framework.go
index 97794eedec2..7cb857d2484 100644
--- a/test/e2e/framework/framework.go
+++ b/test/e2e/framework/framework.go
@@ -36,6 +36,8 @@ import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
+ "k8s.io/kubernetes/pkg/labels"
+ "k8s.io/kubernetes/pkg/util/wait"
)
const (
@@ -423,3 +425,174 @@ func kubectlExec(namespace string, podName, containerName string, args ...string
func KubeDescribe(text string, body func()) bool {
return Describe("[k8s.io] "+text, body)
}
+
+// PodStateVerification represents a verification of pod state.
+// Any time you have a set of pods that you want to operate against or query,
+// this struct can be used to declaratively identify those pods.
+type PodStateVerification struct {
+ // Optional: only pods that have k=v labels will pass this filter.
+ Selectors map[string]string
+
+ // Required: The phases which are valid for your pod.
+ ValidPhases []api.PodPhase
+
+ // Optional: only pods passing this function will pass the filter
+ // Verify a pod.
+ // As an optimization, in addition to specfying filter (boolean),
+ // this function allows specifying an error as well.
+ // The error indicates that the polling of the pod spectrum should stop.
+ Verify func(api.Pod) (bool, error)
+
+ // Optional: only pods with this name will pass the filter.
+ PodName string
+}
+
+type ClusterVerification struct {
+ client *client.Client
+ namespace *api.Namespace // pointer rather than string, since ns isn't created until before each.
+ podState PodStateVerification
+}
+
+func (f *Framework) NewClusterVerification(filter PodStateVerification) *ClusterVerification {
+ return &ClusterVerification{
+ f.Client,
+ f.Namespace,
+ filter,
+ }
+}
+
+func passesPodNameFilter(pod api.Pod, name string) bool {
+ return name == "" || strings.Contains(pod.Name, name)
+}
+
+func passesVerifyFilter(pod api.Pod, verify func(p api.Pod) (bool, error)) (bool, error) {
+ if verify == nil {
+ return true, nil
+ } else {
+ verified, err := verify(pod)
+ // If an error is returned, by definition, pod verification fails
+ if err != nil {
+ return false, err
+ } else {
+ return verified, nil
+ }
+ }
+}
+
+func passesPhasesFilter(pod api.Pod, validPhases []api.PodPhase) bool {
+ passesPhaseFilter := false
+ for _, phase := range validPhases {
+ if pod.Status.Phase == phase {
+ passesPhaseFilter = true
+ }
+ }
+ return passesPhaseFilter
+}
+
+// filterLabels returns a list of pods which have labels.
+func filterLabels(selectors map[string]string, cli *client.Client, ns string) (*api.PodList, error) {
+ var err error
+ var selector labels.Selector
+ var pl *api.PodList
+ // List pods based on selectors. This might be a tiny optimization rather then filtering
+ // everything manually.
+ if len(selectors) > 0 {
+ selector = labels.SelectorFromSet(labels.Set(selectors))
+ options := api.ListOptions{LabelSelector: selector}
+ pl, err = cli.Pods(ns).List(options)
+ } else {
+ pl, err = cli.Pods(ns).List(api.ListOptions{})
+ }
+ return pl, err
+}
+
+// filter filters pods which pass a filter. It can be used to compose
+// the more useful abstractions like ForEach, WaitFor, and so on, which
+// can be used directly by tests.
+func (p *PodStateVerification) filter(c *client.Client, namespace *api.Namespace) ([]api.Pod, error) {
+ if len(p.ValidPhases) == 0 || namespace == nil {
+ panic(fmt.Errorf("Need to specify a valid pod phases (%v) and namespace (%v). ", p.ValidPhases, namespace))
+ }
+
+ ns := namespace.Name
+ pl, err := filterLabels(p.Selectors, c, ns) // Build an api.PodList to operate against.
+ Logf("Selector matched %v pods for %v", len(pl.Items), p.Selectors)
+ if len(pl.Items) == 0 || err != nil {
+ return pl.Items, err
+ }
+
+ unfilteredPods := pl.Items
+ filteredPods := []api.Pod{}
+ReturnPodsSoFar:
+ // Next: Pod must match at least one of the states that the user specified
+ for _, pod := range unfilteredPods {
+ if !(passesPhasesFilter(pod, p.ValidPhases) && passesPodNameFilter(pod, p.PodName)) {
+ continue
+ }
+ passesVerify, err := passesVerifyFilter(pod, p.Verify)
+ if err != nil {
+ Logf("Error detected on %v : %v !", pod.Name, err)
+ break ReturnPodsSoFar
+ }
+ if passesVerify {
+ filteredPods = append(filteredPods, pod)
+ }
+ }
+ return filteredPods, err
+}
+
+// WaitFor waits for some minimum number of pods to be verified, according to the PodStateVerification
+// definition.
+func (cl *ClusterVerification) WaitFor(atLeast int, timeout time.Duration) ([]api.Pod, error) {
+ pods := []api.Pod{}
+ var returnedErr error
+
+ err := wait.Poll(1*time.Second, timeout, func() (bool, error) {
+ pods, returnedErr = cl.podState.filter(cl.client, cl.namespace)
+
+ // Failure
+ if returnedErr != nil {
+ Logf("Cutting polling short: We got an error from the pod filtering layer.")
+ // stop polling if the pod filtering returns an error. that should never happen.
+ // it indicates, for example, that the client is broken or something non-pod related.
+ return false, returnedErr
+ }
+ Logf("Found %v / %v", len(pods), atLeast)
+
+ // Success
+ if len(pods) >= atLeast {
+ return true, nil
+ }
+ // Keep trying...
+ return false, nil
+ })
+ Logf("WaitFor completed. Pods found = %v out of %v", timeout, len(pods), atLeast)
+ return pods, err
+}
+
+// WaitForOrFail provides a shorthand WaitFor with failure as an option if anything goes wrong.
+func (cl *ClusterVerification) WaitForOrFail(atLeast int, timeout time.Duration) {
+ pods, err := cl.WaitFor(atLeast, timeout)
+ if err != nil || len(pods) < atLeast {
+ Failf("Verified %v of %v pods , error : %v", len(pods), atLeast, err)
+ }
+}
+
+// ForEach runs a function against every verifiable pod. Be warned that this doesn't wait for "n" pods to verifiy,
+// so it may return very quickly if you have strict pod state requirements.
+//
+// For example, if you require at least 5 pods to be running before your test will pass,
+// its smart to first call "clusterVerification.WaitFor(5)" before you call clusterVerification.ForEach.
+func (cl *ClusterVerification) ForEach(podFunc func(api.Pod)) error {
+ pods, err := cl.podState.filter(cl.client, cl.namespace)
+ if err == nil {
+ Logf("ForEach: Found %v pods from the filter. Now looping through them.", len(pods))
+ for _, p := range pods {
+ podFunc(p)
+ }
+ } else {
+ Logf("ForEach: Something went wrong when filtering pods to execute against: %v", err)
+ }
+
+ return err
+}
diff --git a/test/e2e/kubectl.go b/test/e2e/kubectl.go
index d3fcaff2d05..caa8319f33b 100644
--- a/test/e2e/kubectl.go
+++ b/test/e2e/kubectl.go
@@ -115,6 +115,25 @@ var (
var _ = framework.KubeDescribe("Kubectl client", func() {
defer GinkgoRecover()
f := framework.NewDefaultFramework("kubectl")
+
+ // Reustable cluster state function. This won't be adversly affected by lazy initialization of framework.
+ clusterState := func() *framework.ClusterVerification {
+ return f.NewClusterVerification(
+ framework.PodStateVerification{
+ Selectors: map[string]string{"app": "redis"},
+ ValidPhases: []api.PodPhase{api.PodRunning /*api.PodPending*/},
+ })
+ }
+ // Customized Wait / ForEach wrapper for this test. These demonstrate the
+ // idiomatic way to wrap the ClusterVerification structs for syntactic sugar in large
+ // test files.
+ waitFor := func(atLeast int) {
+ // 60 seconds can be flakey for some of the containers.
+ clusterState().WaitFor(atLeast, 90*time.Second)
+ }
+ forEachPod := func(podFunc func(p api.Pod)) {
+ clusterState().ForEach(podFunc)
+ }
var c *client.Client
var ns string
BeforeEach(func() {
@@ -588,8 +607,10 @@ var _ = framework.KubeDescribe("Kubectl client", func() {
framework.RunKubectlOrDie("create", "-f", controllerJson, nsFlag)
framework.RunKubectlOrDie("create", "-f", serviceJson, nsFlag)
+ // Wait for the redis pods to come online...
+ waitFor(1)
// Pod
- forEachPod(c, ns, "app", "redis", func(pod api.Pod) {
+ forEachPod(func(pod api.Pod) {
output := framework.RunKubectlOrDie("describe", "pod", pod.Name, nsFlag)
requiredStrings := [][]string{
{"Name:", "redis-master-"},
@@ -684,8 +705,11 @@ var _ = framework.KubeDescribe("Kubectl client", func() {
redisPort := 6379
By("creating Redis RC")
+
+ framework.Logf("namespace %v", ns)
framework.RunKubectlOrDie("create", "-f", controllerJson, nsFlag)
- forEachPod(c, ns, "app", "redis", func(pod api.Pod) {
+ forEachPod(func(pod api.Pod) {
+ framework.Logf("wait on %v ", ns)
framework.LookForStringInLog(ns, pod.Name, "redis-master", "The server is now ready to accept connections", framework.PodStartTimeout)
})
validateService := func(name string, servicePort int, timeout time.Duration) {
@@ -799,7 +823,7 @@ var _ = framework.KubeDescribe("Kubectl client", func() {
It("should be able to retrieve and filter logs [Conformance]", func() {
framework.SkipUnlessServerVersionGTE(extendedPodLogFilterVersion, c)
- forEachPod(c, ns, "app", "redis", func(pod api.Pod) {
+ forEachPod(func(pod api.Pod) {
By("checking for a matching strings")
_, err := framework.LookForStringInLog(ns, pod.Name, containerName, "The server is now ready to accept connections", framework.PodStartTimeout)
Expect(err).NotTo(HaveOccurred())
@@ -850,12 +874,12 @@ var _ = framework.KubeDescribe("Kubectl client", func() {
By("creating Redis RC")
framework.RunKubectlOrDie("create", "-f", controllerJson, nsFlag)
By("patching all pods")
- forEachPod(c, ns, "app", "redis", func(pod api.Pod) {
+ forEachPod(func(pod api.Pod) {
framework.RunKubectlOrDie("patch", "pod", pod.Name, nsFlag, "-p", "{\"metadata\":{\"annotations\":{\"x\":\"y\"}}}")
})
By("checking annotations")
- forEachPod(c, ns, "app", "redis", func(pod api.Pod) {
+ forEachPod(func(pod api.Pod) {
found := false
for key, val := range pod.Annotations {
if key == "x" && val == "y" {