From 4bfa389c1844f7ff94e55a8fd0934faa846d45bf Mon Sep 17 00:00:00 2001 From: Maciej Szulik Date: Thu, 17 Sep 2015 21:35:25 +0200 Subject: [PATCH 1/2] Jobs e2e tests --- job.yaml | 19 ++++ test/e2e/job.go | 246 ++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 265 insertions(+) create mode 100644 job.yaml create mode 100644 test/e2e/job.go diff --git a/job.yaml b/job.yaml new file mode 100644 index 00000000000..789725e06c0 --- /dev/null +++ b/job.yaml @@ -0,0 +1,19 @@ +kind: Job +apiVersion: v1 +metadata: + name: test-job +spec: + parallelism: 2 + completions: 5 + selector: + name: test-job + template: + metadata: + labels: + name: test-job + spec: + restartPolicy: Never + containers: + - name: basic-pod + image: "centos:centos7" + command: ['false'] diff --git a/test/e2e/job.go b/test/e2e/job.go new file mode 100644 index 00000000000..7ea4e149390 --- /dev/null +++ b/test/e2e/job.go @@ -0,0 +1,246 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package e2e + +import ( + "strconv" + "time" + + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/errors" + "k8s.io/kubernetes/pkg/apis/experimental" + client "k8s.io/kubernetes/pkg/client/unversioned" + "k8s.io/kubernetes/pkg/fields" + "k8s.io/kubernetes/pkg/kubectl" + "k8s.io/kubernetes/pkg/labels" + "k8s.io/kubernetes/pkg/util" + "k8s.io/kubernetes/pkg/util/wait" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +const ( + // How long to wait for a job to finish. + jobTimeout = 5 * time.Minute + + // Job selector name + jobSelector = "name" +) + +var _ = Describe("Job", func() { + f := NewFramework("job") + parallelism := 2 + completions := 8 + + It("should run a job", func() { + By("Creating a job") + job, err := createJob(f.Client, f.Namespace.Name, api.RestartPolicyOnFailure, 1, parallelism, completions) + Expect(err).NotTo(HaveOccurred()) + // Cleanup jobs when we are done. + defer func() { + if err := f.Client.Experimental().Jobs(f.Namespace.Name).Delete(job.Name, api.NewDeleteOptions(0)); err != nil { + Logf("Failed to cleanup job %v: %v.", job.Name, err) + } + }() + + By("Ensuring active pods == parallelism") + err = waitForJobRunning(f.Client, f.Namespace.Name, job.Name, parallelism) + Expect(err).NotTo(HaveOccurred()) + By("Ensuring job shows actibe pods") + job, err = f.Client.Experimental().Jobs(f.Namespace.Name).Get(job.Name) + Expect(err).NotTo(HaveOccurred()) + Expect(job.Status.Active).To(BeNumerically(">", 0)) + Expect(job.Status.Active).To(BeNumerically("<=", parallelism)) + + By("Ensuring job reaches completions") + err = waitForJobFinish(f.Client, f.Namespace.Name, job.Name, completions) + Expect(err).NotTo(HaveOccurred()) + }) + + It("should fail a job", func() { + By("Creating a job") + // negative timeout should fail the execution + job, err := createJob(f.Client, f.Namespace.Name, api.RestartPolicyNever, -1, parallelism, completions) + Expect(err).NotTo(HaveOccurred()) + // Cleanup jobs when we are done. + defer func() { + if err := f.Client.Experimental().Jobs(f.Namespace.Name).Delete(job.Name, api.NewDeleteOptions(0)); err != nil { + Logf("Failed to cleanup job %v: %v.", job.Name, err) + } + }() + + By("Ensuring active pods == parallelism") + err = waitForJobRunning(f.Client, f.Namespace.Name, job.Name, parallelism) + Expect(err).NotTo(HaveOccurred()) + + By("Ensuring job shows failures") + job, err = f.Client.Experimental().Jobs(f.Namespace.Name).Get(job.Name) + err = wait.Poll(poll, jobTimeout, func() (bool, error) { + curr, err := f.Client.Experimental().Jobs(f.Namespace.Name).Get(job.Name) + if err != nil { + return false, err + } + return curr.Status.Unsuccessful > 0, nil + }) + }) + + It("should scale a job up", func() { + newparallelism := 4 + By("Creating a job") + job, err := createJob(f.Client, f.Namespace.Name, api.RestartPolicyOnFailure, 10, parallelism, completions) + Expect(err).NotTo(HaveOccurred()) + // Cleanup jobs when we are done. + defer func() { + if err := f.Client.Experimental().Jobs(f.Namespace.Name).Delete(job.Name, api.NewDeleteOptions(0)); err != nil { + Logf("Failed to cleanup job %v: %v.", job.Name, err) + } + }() + + By("Ensuring active pods == parallelism") + err = waitForJobRunning(f.Client, f.Namespace.Name, job.Name, parallelism) + Expect(err).NotTo(HaveOccurred()) + + By("scale job up") + scaler, err := kubectl.ScalerFor("Job", f.Client) + Expect(err).NotTo(HaveOccurred()) + waitForScale := kubectl.NewRetryParams(5*time.Second, 1*time.Minute) + waitForReplicas := kubectl.NewRetryParams(5*time.Second, 5*time.Minute) + scaler.Scale(f.Namespace.Name, job.Name, uint(newparallelism), nil, waitForScale, waitForReplicas) + Expect(err).NotTo(HaveOccurred()) + + By("Ensuring active pods == newparallelism") + err = waitForJobRunning(f.Client, f.Namespace.Name, job.Name, newparallelism) + Expect(err).NotTo(HaveOccurred()) + + By("Ensuring job reaches completions") + err = waitForJobFinish(f.Client, f.Namespace.Name, job.Name, completions) + Expect(err).NotTo(HaveOccurred()) + }) + + It("should scale a job down", func() { + oldparallelism := 4 + By("Creating a job") + job, err := createJob(f.Client, f.Namespace.Name, api.RestartPolicyOnFailure, 10, oldparallelism, completions) + Expect(err).NotTo(HaveOccurred()) + // Cleanup jobs when we are done. + defer func() { + if err := f.Client.Experimental().Jobs(f.Namespace.Name).Delete(job.Name, api.NewDeleteOptions(0)); err != nil { + Logf("Failed to cleanup job %v: %v.", job.Name, err) + } + }() + + By("Ensuring active pods == oldparallelism") + err = waitForJobRunning(f.Client, f.Namespace.Name, job.Name, oldparallelism) + Expect(err).NotTo(HaveOccurred()) + + By("scale job down") + scaler, err := kubectl.ScalerFor("Job", f.Client) + Expect(err).NotTo(HaveOccurred()) + waitForScale := kubectl.NewRetryParams(5*time.Second, 1*time.Minute) + waitForReplicas := kubectl.NewRetryParams(5*time.Second, 5*time.Minute) + err = scaler.Scale(f.Namespace.Name, job.Name, uint(parallelism), nil, waitForScale, waitForReplicas) + Expect(err).NotTo(HaveOccurred()) + + By("Ensuring active pods == parallelism") + err = waitForJobRunning(f.Client, f.Namespace.Name, job.Name, parallelism) + Expect(err).NotTo(HaveOccurred()) + + By("Ensuring job reaches completions") + err = waitForJobFinish(f.Client, f.Namespace.Name, job.Name, completions) + Expect(err).NotTo(HaveOccurred()) + }) + + It("should stop a job", func() { + By("Creating a job") + job, err := createJob(f.Client, f.Namespace.Name, api.RestartPolicyOnFailure, 10, parallelism, completions) + Expect(err).NotTo(HaveOccurred()) + + By("Ensuring active pods == parallelism") + err = waitForJobRunning(f.Client, f.Namespace.Name, job.Name, parallelism) + Expect(err).NotTo(HaveOccurred()) + + By("scale job down") + reaper, err := kubectl.ReaperFor("Job", f.Client) + Expect(err).NotTo(HaveOccurred()) + timeout := 1 * time.Minute + _, err = reaper.Stop(f.Namespace.Name, job.Name, timeout, api.NewDeleteOptions(0)) + Expect(err).NotTo(HaveOccurred()) + + By("Ensuring job was deleted") + _, err = f.Client.Experimental().Jobs(f.Namespace.Name).Get(job.Name) + Expect(err).To(HaveOccurred()) + Expect(errors.IsNotFound(err)).To(BeTrue()) + }) +}) + +func createJob(c *client.Client, ns string, restartPolicy api.RestartPolicy, timeout, parallelism, completions int) (*experimental.Job, error) { + name := "job-" + string(util.NewUUID()) + return c.Experimental().Jobs(ns).Create(&experimental.Job{ + ObjectMeta: api.ObjectMeta{ + Name: name, + }, + Spec: experimental.JobSpec{ + Parallelism: ¶llelism, + Completions: &completions, + Template: &api.PodTemplateSpec{ + ObjectMeta: api.ObjectMeta{ + Labels: map[string]string{jobSelector: name}, + }, + Spec: api.PodSpec{ + RestartPolicy: restartPolicy, + Containers: []api.Container{ + { + Name: name, + Image: "gcr.io/google_containers/busybox", + Command: []string{"sleep", strconv.Itoa(timeout)}, + }, + }, + }, + }, + }, + }) +} + +// Wait for pods to become Running. +func waitForJobRunning(c *client.Client, ns, jobName string, parallelism int) error { + label := labels.SelectorFromSet(labels.Set(map[string]string{jobSelector: jobName})) + return wait.Poll(poll, jobTimeout, func() (bool, error) { + pods, err := c.Pods(ns).List(label, fields.Everything()) + if err != nil { + return false, err + } + count := 0 + for _, p := range pods.Items { + if p.Status.Phase == api.PodRunning { + count++ + } + } + return count == parallelism, nil + }) +} + +// Wait for job to reach completions. +func waitForJobFinish(c *client.Client, ns, jobName string, completions int) error { + return wait.Poll(poll, jobTimeout, func() (bool, error) { + curr, err := c.Experimental().Jobs(ns).Get(jobName) + if err != nil { + return false, err + } + return curr.Status.Successful == completions, nil + }) +} From 3c43c8dfa0b1bd8b27e3966faf5c529e20c3e011 Mon Sep 17 00:00:00 2001 From: Eric Tune Date: Fri, 25 Sep 2015 09:31:06 -0700 Subject: [PATCH 2/2] Improve jobs e2e Remove unnecessary yaml file. Define 4 specific pod behaviors. (sleeping for short periods is going to be flaky during automated testing. Also, sleep -1 still exits 0) Don't wait for a certain number of active pods in tests where the pods terminate after a finite time, since this is racy. Changed some tests to use pods that run forever, and not wait for completion. Added tests with local restarts. Convert the DeleteOptions to the correct api group. --- job.yaml | 19 ---- pkg/client/unversioned/jobs.go | 6 +- test/e2e/job.go | 175 +++++++++++++++++---------------- 3 files changed, 96 insertions(+), 104 deletions(-) delete mode 100644 job.yaml diff --git a/job.yaml b/job.yaml deleted file mode 100644 index 789725e06c0..00000000000 --- a/job.yaml +++ /dev/null @@ -1,19 +0,0 @@ -kind: Job -apiVersion: v1 -metadata: - name: test-job -spec: - parallelism: 2 - completions: 5 - selector: - name: test-job - template: - metadata: - labels: - name: test-job - spec: - restartPolicy: Never - containers: - - name: basic-pod - image: "centos:centos7" - command: ['false'] diff --git a/pkg/client/unversioned/jobs.go b/pkg/client/unversioned/jobs.go index 20a90f00f84..22ab6610e90 100644 --- a/pkg/client/unversioned/jobs.go +++ b/pkg/client/unversioned/jobs.go @@ -18,6 +18,7 @@ package unversioned import ( "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/latest" "k8s.io/kubernetes/pkg/apis/experimental" "k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/labels" @@ -51,6 +52,9 @@ func newJobs(c *ExperimentalClient, namespace string) *jobs { return &jobs{c, namespace} } +// Ensure statically that jobs implements JobInterface. +var _ JobInterface = &jobs{} + // List returns a list of jobs that match the label and field selectors. func (c *jobs) List(label labels.Selector, field fields.Selector) (result *experimental.JobList, err error) { result = &experimental.JobList{} @@ -85,7 +89,7 @@ func (c *jobs) Delete(name string, options *api.DeleteOptions) (err error) { return c.r.Delete().Namespace(c.ns).Resource("jobs").Name(name).Do().Error() } - body, err := api.Scheme.EncodeToVersion(options, c.r.APIVersion()) + body, err := api.Scheme.EncodeToVersion(options, latest.GroupOrDie("").GroupVersion) if err != nil { return err } diff --git a/test/e2e/job.go b/test/e2e/job.go index 7ea4e149390..339e79710bd 100644 --- a/test/e2e/job.go +++ b/test/e2e/job.go @@ -17,7 +17,6 @@ limitations under the License. package e2e import ( - "strconv" "time" "k8s.io/kubernetes/pkg/api" @@ -27,7 +26,6 @@ import ( "k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/kubectl" "k8s.io/kubernetes/pkg/labels" - "k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/wait" . "github.com/onsi/ginkgo" @@ -39,80 +37,79 @@ const ( jobTimeout = 5 * time.Minute // Job selector name - jobSelector = "name" + jobSelectorKey = "job" ) var _ = Describe("Job", func() { f := NewFramework("job") parallelism := 2 - completions := 8 + completions := 4 + lotsOfFailures := 5 // more than completions - It("should run a job", func() { + // Simplest case: all pods succeed promptly + It("should run a job to completion when tasks succeed", func() { By("Creating a job") - job, err := createJob(f.Client, f.Namespace.Name, api.RestartPolicyOnFailure, 1, parallelism, completions) + job := newTestJob("succeed", "all-succeed", api.RestartPolicyNever, parallelism, completions) + job, err := createJob(f.Client, f.Namespace.Name, job) Expect(err).NotTo(HaveOccurred()) - // Cleanup jobs when we are done. - defer func() { - if err := f.Client.Experimental().Jobs(f.Namespace.Name).Delete(job.Name, api.NewDeleteOptions(0)); err != nil { - Logf("Failed to cleanup job %v: %v.", job.Name, err) - } - }() - - By("Ensuring active pods == parallelism") - err = waitForJobRunning(f.Client, f.Namespace.Name, job.Name, parallelism) - Expect(err).NotTo(HaveOccurred()) - By("Ensuring job shows actibe pods") - job, err = f.Client.Experimental().Jobs(f.Namespace.Name).Get(job.Name) - Expect(err).NotTo(HaveOccurred()) - Expect(job.Status.Active).To(BeNumerically(">", 0)) - Expect(job.Status.Active).To(BeNumerically("<=", parallelism)) By("Ensuring job reaches completions") err = waitForJobFinish(f.Client, f.Namespace.Name, job.Name, completions) Expect(err).NotTo(HaveOccurred()) }) - It("should fail a job", func() { + // Pods sometimes fail, but eventually succeed. + It("should run a job to completion when tasks sometimes fail and are locally restarted", func() { By("Creating a job") - // negative timeout should fail the execution - job, err := createJob(f.Client, f.Namespace.Name, api.RestartPolicyNever, -1, parallelism, completions) - Expect(err).NotTo(HaveOccurred()) - // Cleanup jobs when we are done. - defer func() { - if err := f.Client.Experimental().Jobs(f.Namespace.Name).Delete(job.Name, api.NewDeleteOptions(0)); err != nil { - Logf("Failed to cleanup job %v: %v.", job.Name, err) - } - }() - - By("Ensuring active pods == parallelism") - err = waitForJobRunning(f.Client, f.Namespace.Name, job.Name, parallelism) + // 50% chance of container success, local restarts. + job := newTestJob("randomlySucceedOrFail", "rand-local", api.RestartPolicyOnFailure, parallelism, completions) + job, err := createJob(f.Client, f.Namespace.Name, job) Expect(err).NotTo(HaveOccurred()) - By("Ensuring job shows failures") - job, err = f.Client.Experimental().Jobs(f.Namespace.Name).Get(job.Name) + By("Ensuring job reaches completions") + err = waitForJobFinish(f.Client, f.Namespace.Name, job.Name, completions) + Expect(err).NotTo(HaveOccurred()) + }) + + // Pods sometimes fail, but eventually succeed, after pod restarts + It("should run a job to completion when tasks sometimes fail and are not locally restarted", func() { + By("Creating a job") + // 50% chance of container success, local restarts. + job := newTestJob("randomlySucceedOrFail", "rand-non-local", api.RestartPolicyNever, parallelism, completions) + job, err := createJob(f.Client, f.Namespace.Name, job) + Expect(err).NotTo(HaveOccurred()) + + By("Ensuring job reaches completions") + err = waitForJobFinish(f.Client, f.Namespace.Name, job.Name, completions) + Expect(err).NotTo(HaveOccurred()) + }) + + It("should keep restarting failed pods", func() { + By("Creating a job") + job := newTestJob("fail", "all-fail", api.RestartPolicyNever, parallelism, completions) + job, err := createJob(f.Client, f.Namespace.Name, job) + Expect(err).NotTo(HaveOccurred()) + + By("Ensuring job shows many failures") err = wait.Poll(poll, jobTimeout, func() (bool, error) { curr, err := f.Client.Experimental().Jobs(f.Namespace.Name).Get(job.Name) if err != nil { return false, err } - return curr.Status.Unsuccessful > 0, nil + return curr.Status.Unsuccessful > lotsOfFailures, nil }) }) It("should scale a job up", func() { - newparallelism := 4 + startParallelism := 1 + endParallelism := 2 By("Creating a job") - job, err := createJob(f.Client, f.Namespace.Name, api.RestartPolicyOnFailure, 10, parallelism, completions) + job := newTestJob("notTerminate", "scale-up", api.RestartPolicyNever, startParallelism, completions) + job, err := createJob(f.Client, f.Namespace.Name, job) Expect(err).NotTo(HaveOccurred()) - // Cleanup jobs when we are done. - defer func() { - if err := f.Client.Experimental().Jobs(f.Namespace.Name).Delete(job.Name, api.NewDeleteOptions(0)); err != nil { - Logf("Failed to cleanup job %v: %v.", job.Name, err) - } - }() - By("Ensuring active pods == parallelism") - err = waitForJobRunning(f.Client, f.Namespace.Name, job.Name, parallelism) + By("Ensuring active pods == startParallelism") + err = waitForAllPodsRunning(f.Client, f.Namespace.Name, job.Name, startParallelism) Expect(err).NotTo(HaveOccurred()) By("scale job up") @@ -120,32 +117,24 @@ var _ = Describe("Job", func() { Expect(err).NotTo(HaveOccurred()) waitForScale := kubectl.NewRetryParams(5*time.Second, 1*time.Minute) waitForReplicas := kubectl.NewRetryParams(5*time.Second, 5*time.Minute) - scaler.Scale(f.Namespace.Name, job.Name, uint(newparallelism), nil, waitForScale, waitForReplicas) + scaler.Scale(f.Namespace.Name, job.Name, uint(endParallelism), nil, waitForScale, waitForReplicas) Expect(err).NotTo(HaveOccurred()) - By("Ensuring active pods == newparallelism") - err = waitForJobRunning(f.Client, f.Namespace.Name, job.Name, newparallelism) - Expect(err).NotTo(HaveOccurred()) - - By("Ensuring job reaches completions") - err = waitForJobFinish(f.Client, f.Namespace.Name, job.Name, completions) + By("Ensuring active pods == endParallelism") + err = waitForAllPodsRunning(f.Client, f.Namespace.Name, job.Name, endParallelism) Expect(err).NotTo(HaveOccurred()) }) It("should scale a job down", func() { - oldparallelism := 4 + startParallelism := 2 + endParallelism := 1 By("Creating a job") - job, err := createJob(f.Client, f.Namespace.Name, api.RestartPolicyOnFailure, 10, oldparallelism, completions) + job := newTestJob("notTerminate", "scale-down", api.RestartPolicyNever, startParallelism, completions) + job, err := createJob(f.Client, f.Namespace.Name, job) Expect(err).NotTo(HaveOccurred()) - // Cleanup jobs when we are done. - defer func() { - if err := f.Client.Experimental().Jobs(f.Namespace.Name).Delete(job.Name, api.NewDeleteOptions(0)); err != nil { - Logf("Failed to cleanup job %v: %v.", job.Name, err) - } - }() - By("Ensuring active pods == oldparallelism") - err = waitForJobRunning(f.Client, f.Namespace.Name, job.Name, oldparallelism) + By("Ensuring active pods == startParallelism") + err = waitForAllPodsRunning(f.Client, f.Namespace.Name, job.Name, startParallelism) Expect(err).NotTo(HaveOccurred()) By("scale job down") @@ -153,25 +142,22 @@ var _ = Describe("Job", func() { Expect(err).NotTo(HaveOccurred()) waitForScale := kubectl.NewRetryParams(5*time.Second, 1*time.Minute) waitForReplicas := kubectl.NewRetryParams(5*time.Second, 5*time.Minute) - err = scaler.Scale(f.Namespace.Name, job.Name, uint(parallelism), nil, waitForScale, waitForReplicas) + err = scaler.Scale(f.Namespace.Name, job.Name, uint(endParallelism), nil, waitForScale, waitForReplicas) Expect(err).NotTo(HaveOccurred()) - By("Ensuring active pods == parallelism") - err = waitForJobRunning(f.Client, f.Namespace.Name, job.Name, parallelism) - Expect(err).NotTo(HaveOccurred()) - - By("Ensuring job reaches completions") - err = waitForJobFinish(f.Client, f.Namespace.Name, job.Name, completions) + By("Ensuring active pods == endParallelism") + err = waitForAllPodsRunning(f.Client, f.Namespace.Name, job.Name, endParallelism) Expect(err).NotTo(HaveOccurred()) }) It("should stop a job", func() { By("Creating a job") - job, err := createJob(f.Client, f.Namespace.Name, api.RestartPolicyOnFailure, 10, parallelism, completions) + job := newTestJob("notTerminate", "foo", api.RestartPolicyNever, parallelism, completions) + job, err := createJob(f.Client, f.Namespace.Name, job) Expect(err).NotTo(HaveOccurred()) By("Ensuring active pods == parallelism") - err = waitForJobRunning(f.Client, f.Namespace.Name, job.Name, parallelism) + err = waitForAllPodsRunning(f.Client, f.Namespace.Name, job.Name, parallelism) Expect(err).NotTo(HaveOccurred()) By("scale job down") @@ -188,9 +174,9 @@ var _ = Describe("Job", func() { }) }) -func createJob(c *client.Client, ns string, restartPolicy api.RestartPolicy, timeout, parallelism, completions int) (*experimental.Job, error) { - name := "job-" + string(util.NewUUID()) - return c.Experimental().Jobs(ns).Create(&experimental.Job{ +// newTestJob returns a job which does one of several testing behaviors. +func newTestJob(behavior, name string, rPol api.RestartPolicy, parallelism, completions int) *experimental.Job { + job := &experimental.Job{ ObjectMeta: api.ObjectMeta{ Name: name, }, @@ -199,26 +185,47 @@ func createJob(c *client.Client, ns string, restartPolicy api.RestartPolicy, tim Completions: &completions, Template: &api.PodTemplateSpec{ ObjectMeta: api.ObjectMeta{ - Labels: map[string]string{jobSelector: name}, + Labels: map[string]string{jobSelectorKey: name}, }, Spec: api.PodSpec{ - RestartPolicy: restartPolicy, + RestartPolicy: rPol, Containers: []api.Container{ { - Name: name, + Name: "c", Image: "gcr.io/google_containers/busybox", - Command: []string{"sleep", strconv.Itoa(timeout)}, + Command: []string{}, }, }, }, }, }, - }) + } + switch behavior { + case "notTerminate": + job.Spec.Template.Spec.Containers[0].Command = []string{"sleep", "1000000"} + case "fail": + job.Spec.Template.Spec.Containers[0].Command = []string{"/bin/sh", "-c", "exit 1"} + case "succeed": + job.Spec.Template.Spec.Containers[0].Command = []string{"/bin/sh", "-c", "exit 0"} + case "randomlySucceedOrFail": + // Bash's $RANDOM generates pseudorandom int in range 0 - 32767. + // Dividing by 16384 gives roughly 50/50 chance of succeess. + job.Spec.Template.Spec.Containers[0].Command = []string{"/bin/sh", "-c", "exit $(( $RANDOM / 16384 ))"} + } + return job } -// Wait for pods to become Running. -func waitForJobRunning(c *client.Client, ns, jobName string, parallelism int) error { - label := labels.SelectorFromSet(labels.Set(map[string]string{jobSelector: jobName})) +func createJob(c *client.Client, ns string, job *experimental.Job) (*experimental.Job, error) { + return c.Experimental().Jobs(ns).Create(job) +} + +func deleteJob(c *client.Client, ns, name string) error { + return c.Experimental().Jobs(ns).Delete(name, api.NewDeleteOptions(0)) +} + +// Wait for all pods to become Running. Only use when pods will run for a long time, or it will be racy. +func waitForAllPodsRunning(c *client.Client, ns, jobName string, parallelism int) error { + label := labels.SelectorFromSet(labels.Set(map[string]string{jobSelectorKey: jobName})) return wait.Poll(poll, jobTimeout, func() (bool, error) { pods, err := c.Pods(ns).List(label, fields.Everything()) if err != nil {