diff --git a/job.yaml b/job.yaml deleted file mode 100644 index 789725e06c0..00000000000 --- a/job.yaml +++ /dev/null @@ -1,19 +0,0 @@ -kind: Job -apiVersion: v1 -metadata: - name: test-job -spec: - parallelism: 2 - completions: 5 - selector: - name: test-job - template: - metadata: - labels: - name: test-job - spec: - restartPolicy: Never - containers: - - name: basic-pod - image: "centos:centos7" - command: ['false'] diff --git a/pkg/client/unversioned/jobs.go b/pkg/client/unversioned/jobs.go index 20a90f00f84..22ab6610e90 100644 --- a/pkg/client/unversioned/jobs.go +++ b/pkg/client/unversioned/jobs.go @@ -18,6 +18,7 @@ package unversioned import ( "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/latest" "k8s.io/kubernetes/pkg/apis/experimental" "k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/labels" @@ -51,6 +52,9 @@ func newJobs(c *ExperimentalClient, namespace string) *jobs { return &jobs{c, namespace} } +// Ensure statically that jobs implements JobInterface. +var _ JobInterface = &jobs{} + // List returns a list of jobs that match the label and field selectors. func (c *jobs) List(label labels.Selector, field fields.Selector) (result *experimental.JobList, err error) { result = &experimental.JobList{} @@ -85,7 +89,7 @@ func (c *jobs) Delete(name string, options *api.DeleteOptions) (err error) { return c.r.Delete().Namespace(c.ns).Resource("jobs").Name(name).Do().Error() } - body, err := api.Scheme.EncodeToVersion(options, c.r.APIVersion()) + body, err := api.Scheme.EncodeToVersion(options, latest.GroupOrDie("").GroupVersion) if err != nil { return err } diff --git a/test/e2e/job.go b/test/e2e/job.go index 7ea4e149390..339e79710bd 100644 --- a/test/e2e/job.go +++ b/test/e2e/job.go @@ -17,7 +17,6 @@ limitations under the License. package e2e import ( - "strconv" "time" "k8s.io/kubernetes/pkg/api" @@ -27,7 +26,6 @@ import ( "k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/kubectl" "k8s.io/kubernetes/pkg/labels" - "k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/wait" . "github.com/onsi/ginkgo" @@ -39,80 +37,79 @@ const ( jobTimeout = 5 * time.Minute // Job selector name - jobSelector = "name" + jobSelectorKey = "job" ) var _ = Describe("Job", func() { f := NewFramework("job") parallelism := 2 - completions := 8 + completions := 4 + lotsOfFailures := 5 // more than completions - It("should run a job", func() { + // Simplest case: all pods succeed promptly + It("should run a job to completion when tasks succeed", func() { By("Creating a job") - job, err := createJob(f.Client, f.Namespace.Name, api.RestartPolicyOnFailure, 1, parallelism, completions) + job := newTestJob("succeed", "all-succeed", api.RestartPolicyNever, parallelism, completions) + job, err := createJob(f.Client, f.Namespace.Name, job) Expect(err).NotTo(HaveOccurred()) - // Cleanup jobs when we are done. - defer func() { - if err := f.Client.Experimental().Jobs(f.Namespace.Name).Delete(job.Name, api.NewDeleteOptions(0)); err != nil { - Logf("Failed to cleanup job %v: %v.", job.Name, err) - } - }() - - By("Ensuring active pods == parallelism") - err = waitForJobRunning(f.Client, f.Namespace.Name, job.Name, parallelism) - Expect(err).NotTo(HaveOccurred()) - By("Ensuring job shows actibe pods") - job, err = f.Client.Experimental().Jobs(f.Namespace.Name).Get(job.Name) - Expect(err).NotTo(HaveOccurred()) - Expect(job.Status.Active).To(BeNumerically(">", 0)) - Expect(job.Status.Active).To(BeNumerically("<=", parallelism)) By("Ensuring job reaches completions") err = waitForJobFinish(f.Client, f.Namespace.Name, job.Name, completions) Expect(err).NotTo(HaveOccurred()) }) - It("should fail a job", func() { + // Pods sometimes fail, but eventually succeed. + It("should run a job to completion when tasks sometimes fail and are locally restarted", func() { By("Creating a job") - // negative timeout should fail the execution - job, err := createJob(f.Client, f.Namespace.Name, api.RestartPolicyNever, -1, parallelism, completions) - Expect(err).NotTo(HaveOccurred()) - // Cleanup jobs when we are done. - defer func() { - if err := f.Client.Experimental().Jobs(f.Namespace.Name).Delete(job.Name, api.NewDeleteOptions(0)); err != nil { - Logf("Failed to cleanup job %v: %v.", job.Name, err) - } - }() - - By("Ensuring active pods == parallelism") - err = waitForJobRunning(f.Client, f.Namespace.Name, job.Name, parallelism) + // 50% chance of container success, local restarts. + job := newTestJob("randomlySucceedOrFail", "rand-local", api.RestartPolicyOnFailure, parallelism, completions) + job, err := createJob(f.Client, f.Namespace.Name, job) Expect(err).NotTo(HaveOccurred()) - By("Ensuring job shows failures") - job, err = f.Client.Experimental().Jobs(f.Namespace.Name).Get(job.Name) + By("Ensuring job reaches completions") + err = waitForJobFinish(f.Client, f.Namespace.Name, job.Name, completions) + Expect(err).NotTo(HaveOccurred()) + }) + + // Pods sometimes fail, but eventually succeed, after pod restarts + It("should run a job to completion when tasks sometimes fail and are not locally restarted", func() { + By("Creating a job") + // 50% chance of container success, local restarts. + job := newTestJob("randomlySucceedOrFail", "rand-non-local", api.RestartPolicyNever, parallelism, completions) + job, err := createJob(f.Client, f.Namespace.Name, job) + Expect(err).NotTo(HaveOccurred()) + + By("Ensuring job reaches completions") + err = waitForJobFinish(f.Client, f.Namespace.Name, job.Name, completions) + Expect(err).NotTo(HaveOccurred()) + }) + + It("should keep restarting failed pods", func() { + By("Creating a job") + job := newTestJob("fail", "all-fail", api.RestartPolicyNever, parallelism, completions) + job, err := createJob(f.Client, f.Namespace.Name, job) + Expect(err).NotTo(HaveOccurred()) + + By("Ensuring job shows many failures") err = wait.Poll(poll, jobTimeout, func() (bool, error) { curr, err := f.Client.Experimental().Jobs(f.Namespace.Name).Get(job.Name) if err != nil { return false, err } - return curr.Status.Unsuccessful > 0, nil + return curr.Status.Unsuccessful > lotsOfFailures, nil }) }) It("should scale a job up", func() { - newparallelism := 4 + startParallelism := 1 + endParallelism := 2 By("Creating a job") - job, err := createJob(f.Client, f.Namespace.Name, api.RestartPolicyOnFailure, 10, parallelism, completions) + job := newTestJob("notTerminate", "scale-up", api.RestartPolicyNever, startParallelism, completions) + job, err := createJob(f.Client, f.Namespace.Name, job) Expect(err).NotTo(HaveOccurred()) - // Cleanup jobs when we are done. - defer func() { - if err := f.Client.Experimental().Jobs(f.Namespace.Name).Delete(job.Name, api.NewDeleteOptions(0)); err != nil { - Logf("Failed to cleanup job %v: %v.", job.Name, err) - } - }() - By("Ensuring active pods == parallelism") - err = waitForJobRunning(f.Client, f.Namespace.Name, job.Name, parallelism) + By("Ensuring active pods == startParallelism") + err = waitForAllPodsRunning(f.Client, f.Namespace.Name, job.Name, startParallelism) Expect(err).NotTo(HaveOccurred()) By("scale job up") @@ -120,32 +117,24 @@ var _ = Describe("Job", func() { Expect(err).NotTo(HaveOccurred()) waitForScale := kubectl.NewRetryParams(5*time.Second, 1*time.Minute) waitForReplicas := kubectl.NewRetryParams(5*time.Second, 5*time.Minute) - scaler.Scale(f.Namespace.Name, job.Name, uint(newparallelism), nil, waitForScale, waitForReplicas) + scaler.Scale(f.Namespace.Name, job.Name, uint(endParallelism), nil, waitForScale, waitForReplicas) Expect(err).NotTo(HaveOccurred()) - By("Ensuring active pods == newparallelism") - err = waitForJobRunning(f.Client, f.Namespace.Name, job.Name, newparallelism) - Expect(err).NotTo(HaveOccurred()) - - By("Ensuring job reaches completions") - err = waitForJobFinish(f.Client, f.Namespace.Name, job.Name, completions) + By("Ensuring active pods == endParallelism") + err = waitForAllPodsRunning(f.Client, f.Namespace.Name, job.Name, endParallelism) Expect(err).NotTo(HaveOccurred()) }) It("should scale a job down", func() { - oldparallelism := 4 + startParallelism := 2 + endParallelism := 1 By("Creating a job") - job, err := createJob(f.Client, f.Namespace.Name, api.RestartPolicyOnFailure, 10, oldparallelism, completions) + job := newTestJob("notTerminate", "scale-down", api.RestartPolicyNever, startParallelism, completions) + job, err := createJob(f.Client, f.Namespace.Name, job) Expect(err).NotTo(HaveOccurred()) - // Cleanup jobs when we are done. - defer func() { - if err := f.Client.Experimental().Jobs(f.Namespace.Name).Delete(job.Name, api.NewDeleteOptions(0)); err != nil { - Logf("Failed to cleanup job %v: %v.", job.Name, err) - } - }() - By("Ensuring active pods == oldparallelism") - err = waitForJobRunning(f.Client, f.Namespace.Name, job.Name, oldparallelism) + By("Ensuring active pods == startParallelism") + err = waitForAllPodsRunning(f.Client, f.Namespace.Name, job.Name, startParallelism) Expect(err).NotTo(HaveOccurred()) By("scale job down") @@ -153,25 +142,22 @@ var _ = Describe("Job", func() { Expect(err).NotTo(HaveOccurred()) waitForScale := kubectl.NewRetryParams(5*time.Second, 1*time.Minute) waitForReplicas := kubectl.NewRetryParams(5*time.Second, 5*time.Minute) - err = scaler.Scale(f.Namespace.Name, job.Name, uint(parallelism), nil, waitForScale, waitForReplicas) + err = scaler.Scale(f.Namespace.Name, job.Name, uint(endParallelism), nil, waitForScale, waitForReplicas) Expect(err).NotTo(HaveOccurred()) - By("Ensuring active pods == parallelism") - err = waitForJobRunning(f.Client, f.Namespace.Name, job.Name, parallelism) - Expect(err).NotTo(HaveOccurred()) - - By("Ensuring job reaches completions") - err = waitForJobFinish(f.Client, f.Namespace.Name, job.Name, completions) + By("Ensuring active pods == endParallelism") + err = waitForAllPodsRunning(f.Client, f.Namespace.Name, job.Name, endParallelism) Expect(err).NotTo(HaveOccurred()) }) It("should stop a job", func() { By("Creating a job") - job, err := createJob(f.Client, f.Namespace.Name, api.RestartPolicyOnFailure, 10, parallelism, completions) + job := newTestJob("notTerminate", "foo", api.RestartPolicyNever, parallelism, completions) + job, err := createJob(f.Client, f.Namespace.Name, job) Expect(err).NotTo(HaveOccurred()) By("Ensuring active pods == parallelism") - err = waitForJobRunning(f.Client, f.Namespace.Name, job.Name, parallelism) + err = waitForAllPodsRunning(f.Client, f.Namespace.Name, job.Name, parallelism) Expect(err).NotTo(HaveOccurred()) By("scale job down") @@ -188,9 +174,9 @@ var _ = Describe("Job", func() { }) }) -func createJob(c *client.Client, ns string, restartPolicy api.RestartPolicy, timeout, parallelism, completions int) (*experimental.Job, error) { - name := "job-" + string(util.NewUUID()) - return c.Experimental().Jobs(ns).Create(&experimental.Job{ +// newTestJob returns a job which does one of several testing behaviors. +func newTestJob(behavior, name string, rPol api.RestartPolicy, parallelism, completions int) *experimental.Job { + job := &experimental.Job{ ObjectMeta: api.ObjectMeta{ Name: name, }, @@ -199,26 +185,47 @@ func createJob(c *client.Client, ns string, restartPolicy api.RestartPolicy, tim Completions: &completions, Template: &api.PodTemplateSpec{ ObjectMeta: api.ObjectMeta{ - Labels: map[string]string{jobSelector: name}, + Labels: map[string]string{jobSelectorKey: name}, }, Spec: api.PodSpec{ - RestartPolicy: restartPolicy, + RestartPolicy: rPol, Containers: []api.Container{ { - Name: name, + Name: "c", Image: "gcr.io/google_containers/busybox", - Command: []string{"sleep", strconv.Itoa(timeout)}, + Command: []string{}, }, }, }, }, }, - }) + } + switch behavior { + case "notTerminate": + job.Spec.Template.Spec.Containers[0].Command = []string{"sleep", "1000000"} + case "fail": + job.Spec.Template.Spec.Containers[0].Command = []string{"/bin/sh", "-c", "exit 1"} + case "succeed": + job.Spec.Template.Spec.Containers[0].Command = []string{"/bin/sh", "-c", "exit 0"} + case "randomlySucceedOrFail": + // Bash's $RANDOM generates pseudorandom int in range 0 - 32767. + // Dividing by 16384 gives roughly 50/50 chance of succeess. + job.Spec.Template.Spec.Containers[0].Command = []string{"/bin/sh", "-c", "exit $(( $RANDOM / 16384 ))"} + } + return job } -// Wait for pods to become Running. -func waitForJobRunning(c *client.Client, ns, jobName string, parallelism int) error { - label := labels.SelectorFromSet(labels.Set(map[string]string{jobSelector: jobName})) +func createJob(c *client.Client, ns string, job *experimental.Job) (*experimental.Job, error) { + return c.Experimental().Jobs(ns).Create(job) +} + +func deleteJob(c *client.Client, ns, name string) error { + return c.Experimental().Jobs(ns).Delete(name, api.NewDeleteOptions(0)) +} + +// Wait for all pods to become Running. Only use when pods will run for a long time, or it will be racy. +func waitForAllPodsRunning(c *client.Client, ns, jobName string, parallelism int) error { + label := labels.SelectorFromSet(labels.Set(map[string]string{jobSelectorKey: jobName})) return wait.Poll(poll, jobTimeout, func() (bool, error) { pods, err := c.Pods(ns).List(label, fields.Everything()) if err != nil {