Improve jobs e2e

Remove unnecessary yaml file.

Define 4 specific pod behaviors.
(sleeping for short periods is going to be flaky during automated
testing.  Also, sleep -1 still exits 0)

Don't wait for a certain number of active pods in tests
where the pods terminate after a finite time, since this is racy.

Changed some tests to use pods that run forever, and not wait
for completion.

Added tests with local restarts.

Convert the DeleteOptions to the correct api group.
This commit is contained in:
Eric Tune 2015-09-25 09:31:06 -07:00
parent 4bfa389c18
commit 3c43c8dfa0
3 changed files with 96 additions and 104 deletions

View File

@ -1,19 +0,0 @@
kind: Job
apiVersion: v1
metadata:
name: test-job
spec:
parallelism: 2
completions: 5
selector:
name: test-job
template:
metadata:
labels:
name: test-job
spec:
restartPolicy: Never
containers:
- name: basic-pod
image: "centos:centos7"
command: ['false']

View File

@ -18,6 +18,7 @@ package unversioned
import ( import (
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/latest"
"k8s.io/kubernetes/pkg/apis/experimental" "k8s.io/kubernetes/pkg/apis/experimental"
"k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/labels"
@ -51,6 +52,9 @@ func newJobs(c *ExperimentalClient, namespace string) *jobs {
return &jobs{c, namespace} return &jobs{c, namespace}
} }
// Ensure statically that jobs implements JobInterface.
var _ JobInterface = &jobs{}
// List returns a list of jobs that match the label and field selectors. // List returns a list of jobs that match the label and field selectors.
func (c *jobs) List(label labels.Selector, field fields.Selector) (result *experimental.JobList, err error) { func (c *jobs) List(label labels.Selector, field fields.Selector) (result *experimental.JobList, err error) {
result = &experimental.JobList{} result = &experimental.JobList{}
@ -85,7 +89,7 @@ func (c *jobs) Delete(name string, options *api.DeleteOptions) (err error) {
return c.r.Delete().Namespace(c.ns).Resource("jobs").Name(name).Do().Error() return c.r.Delete().Namespace(c.ns).Resource("jobs").Name(name).Do().Error()
} }
body, err := api.Scheme.EncodeToVersion(options, c.r.APIVersion()) body, err := api.Scheme.EncodeToVersion(options, latest.GroupOrDie("").GroupVersion)
if err != nil { if err != nil {
return err return err
} }

View File

@ -17,7 +17,6 @@ limitations under the License.
package e2e package e2e
import ( import (
"strconv"
"time" "time"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
@ -27,7 +26,6 @@ import (
"k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/kubectl" "k8s.io/kubernetes/pkg/kubectl"
"k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/util/wait"
. "github.com/onsi/ginkgo" . "github.com/onsi/ginkgo"
@ -39,80 +37,79 @@ const (
jobTimeout = 5 * time.Minute jobTimeout = 5 * time.Minute
// Job selector name // Job selector name
jobSelector = "name" jobSelectorKey = "job"
) )
var _ = Describe("Job", func() { var _ = Describe("Job", func() {
f := NewFramework("job") f := NewFramework("job")
parallelism := 2 parallelism := 2
completions := 8 completions := 4
lotsOfFailures := 5 // more than completions
It("should run a job", func() { // Simplest case: all pods succeed promptly
It("should run a job to completion when tasks succeed", func() {
By("Creating a job") By("Creating a job")
job, err := createJob(f.Client, f.Namespace.Name, api.RestartPolicyOnFailure, 1, parallelism, completions) job := newTestJob("succeed", "all-succeed", api.RestartPolicyNever, parallelism, completions)
job, err := createJob(f.Client, f.Namespace.Name, job)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
// Cleanup jobs when we are done.
defer func() {
if err := f.Client.Experimental().Jobs(f.Namespace.Name).Delete(job.Name, api.NewDeleteOptions(0)); err != nil {
Logf("Failed to cleanup job %v: %v.", job.Name, err)
}
}()
By("Ensuring active pods == parallelism")
err = waitForJobRunning(f.Client, f.Namespace.Name, job.Name, parallelism)
Expect(err).NotTo(HaveOccurred())
By("Ensuring job shows actibe pods")
job, err = f.Client.Experimental().Jobs(f.Namespace.Name).Get(job.Name)
Expect(err).NotTo(HaveOccurred())
Expect(job.Status.Active).To(BeNumerically(">", 0))
Expect(job.Status.Active).To(BeNumerically("<=", parallelism))
By("Ensuring job reaches completions") By("Ensuring job reaches completions")
err = waitForJobFinish(f.Client, f.Namespace.Name, job.Name, completions) err = waitForJobFinish(f.Client, f.Namespace.Name, job.Name, completions)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
}) })
It("should fail a job", func() { // Pods sometimes fail, but eventually succeed.
It("should run a job to completion when tasks sometimes fail and are locally restarted", func() {
By("Creating a job") By("Creating a job")
// negative timeout should fail the execution // 50% chance of container success, local restarts.
job, err := createJob(f.Client, f.Namespace.Name, api.RestartPolicyNever, -1, parallelism, completions) job := newTestJob("randomlySucceedOrFail", "rand-local", api.RestartPolicyOnFailure, parallelism, completions)
Expect(err).NotTo(HaveOccurred()) job, err := createJob(f.Client, f.Namespace.Name, job)
// Cleanup jobs when we are done.
defer func() {
if err := f.Client.Experimental().Jobs(f.Namespace.Name).Delete(job.Name, api.NewDeleteOptions(0)); err != nil {
Logf("Failed to cleanup job %v: %v.", job.Name, err)
}
}()
By("Ensuring active pods == parallelism")
err = waitForJobRunning(f.Client, f.Namespace.Name, job.Name, parallelism)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
By("Ensuring job shows failures") By("Ensuring job reaches completions")
job, err = f.Client.Experimental().Jobs(f.Namespace.Name).Get(job.Name) err = waitForJobFinish(f.Client, f.Namespace.Name, job.Name, completions)
Expect(err).NotTo(HaveOccurred())
})
// Pods sometimes fail, but eventually succeed, after pod restarts
It("should run a job to completion when tasks sometimes fail and are not locally restarted", func() {
By("Creating a job")
// 50% chance of container success, local restarts.
job := newTestJob("randomlySucceedOrFail", "rand-non-local", api.RestartPolicyNever, parallelism, completions)
job, err := createJob(f.Client, f.Namespace.Name, job)
Expect(err).NotTo(HaveOccurred())
By("Ensuring job reaches completions")
err = waitForJobFinish(f.Client, f.Namespace.Name, job.Name, completions)
Expect(err).NotTo(HaveOccurred())
})
It("should keep restarting failed pods", func() {
By("Creating a job")
job := newTestJob("fail", "all-fail", api.RestartPolicyNever, parallelism, completions)
job, err := createJob(f.Client, f.Namespace.Name, job)
Expect(err).NotTo(HaveOccurred())
By("Ensuring job shows many failures")
err = wait.Poll(poll, jobTimeout, func() (bool, error) { err = wait.Poll(poll, jobTimeout, func() (bool, error) {
curr, err := f.Client.Experimental().Jobs(f.Namespace.Name).Get(job.Name) curr, err := f.Client.Experimental().Jobs(f.Namespace.Name).Get(job.Name)
if err != nil { if err != nil {
return false, err return false, err
} }
return curr.Status.Unsuccessful > 0, nil return curr.Status.Unsuccessful > lotsOfFailures, nil
}) })
}) })
It("should scale a job up", func() { It("should scale a job up", func() {
newparallelism := 4 startParallelism := 1
endParallelism := 2
By("Creating a job") By("Creating a job")
job, err := createJob(f.Client, f.Namespace.Name, api.RestartPolicyOnFailure, 10, parallelism, completions) job := newTestJob("notTerminate", "scale-up", api.RestartPolicyNever, startParallelism, completions)
job, err := createJob(f.Client, f.Namespace.Name, job)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
// Cleanup jobs when we are done.
defer func() {
if err := f.Client.Experimental().Jobs(f.Namespace.Name).Delete(job.Name, api.NewDeleteOptions(0)); err != nil {
Logf("Failed to cleanup job %v: %v.", job.Name, err)
}
}()
By("Ensuring active pods == parallelism") By("Ensuring active pods == startParallelism")
err = waitForJobRunning(f.Client, f.Namespace.Name, job.Name, parallelism) err = waitForAllPodsRunning(f.Client, f.Namespace.Name, job.Name, startParallelism)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
By("scale job up") By("scale job up")
@ -120,32 +117,24 @@ var _ = Describe("Job", func() {
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
waitForScale := kubectl.NewRetryParams(5*time.Second, 1*time.Minute) waitForScale := kubectl.NewRetryParams(5*time.Second, 1*time.Minute)
waitForReplicas := kubectl.NewRetryParams(5*time.Second, 5*time.Minute) waitForReplicas := kubectl.NewRetryParams(5*time.Second, 5*time.Minute)
scaler.Scale(f.Namespace.Name, job.Name, uint(newparallelism), nil, waitForScale, waitForReplicas) scaler.Scale(f.Namespace.Name, job.Name, uint(endParallelism), nil, waitForScale, waitForReplicas)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
By("Ensuring active pods == newparallelism") By("Ensuring active pods == endParallelism")
err = waitForJobRunning(f.Client, f.Namespace.Name, job.Name, newparallelism) err = waitForAllPodsRunning(f.Client, f.Namespace.Name, job.Name, endParallelism)
Expect(err).NotTo(HaveOccurred())
By("Ensuring job reaches completions")
err = waitForJobFinish(f.Client, f.Namespace.Name, job.Name, completions)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
}) })
It("should scale a job down", func() { It("should scale a job down", func() {
oldparallelism := 4 startParallelism := 2
endParallelism := 1
By("Creating a job") By("Creating a job")
job, err := createJob(f.Client, f.Namespace.Name, api.RestartPolicyOnFailure, 10, oldparallelism, completions) job := newTestJob("notTerminate", "scale-down", api.RestartPolicyNever, startParallelism, completions)
job, err := createJob(f.Client, f.Namespace.Name, job)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
// Cleanup jobs when we are done.
defer func() {
if err := f.Client.Experimental().Jobs(f.Namespace.Name).Delete(job.Name, api.NewDeleteOptions(0)); err != nil {
Logf("Failed to cleanup job %v: %v.", job.Name, err)
}
}()
By("Ensuring active pods == oldparallelism") By("Ensuring active pods == startParallelism")
err = waitForJobRunning(f.Client, f.Namespace.Name, job.Name, oldparallelism) err = waitForAllPodsRunning(f.Client, f.Namespace.Name, job.Name, startParallelism)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
By("scale job down") By("scale job down")
@ -153,25 +142,22 @@ var _ = Describe("Job", func() {
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
waitForScale := kubectl.NewRetryParams(5*time.Second, 1*time.Minute) waitForScale := kubectl.NewRetryParams(5*time.Second, 1*time.Minute)
waitForReplicas := kubectl.NewRetryParams(5*time.Second, 5*time.Minute) waitForReplicas := kubectl.NewRetryParams(5*time.Second, 5*time.Minute)
err = scaler.Scale(f.Namespace.Name, job.Name, uint(parallelism), nil, waitForScale, waitForReplicas) err = scaler.Scale(f.Namespace.Name, job.Name, uint(endParallelism), nil, waitForScale, waitForReplicas)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
By("Ensuring active pods == parallelism") By("Ensuring active pods == endParallelism")
err = waitForJobRunning(f.Client, f.Namespace.Name, job.Name, parallelism) err = waitForAllPodsRunning(f.Client, f.Namespace.Name, job.Name, endParallelism)
Expect(err).NotTo(HaveOccurred())
By("Ensuring job reaches completions")
err = waitForJobFinish(f.Client, f.Namespace.Name, job.Name, completions)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
}) })
It("should stop a job", func() { It("should stop a job", func() {
By("Creating a job") By("Creating a job")
job, err := createJob(f.Client, f.Namespace.Name, api.RestartPolicyOnFailure, 10, parallelism, completions) job := newTestJob("notTerminate", "foo", api.RestartPolicyNever, parallelism, completions)
job, err := createJob(f.Client, f.Namespace.Name, job)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
By("Ensuring active pods == parallelism") By("Ensuring active pods == parallelism")
err = waitForJobRunning(f.Client, f.Namespace.Name, job.Name, parallelism) err = waitForAllPodsRunning(f.Client, f.Namespace.Name, job.Name, parallelism)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
By("scale job down") By("scale job down")
@ -188,9 +174,9 @@ var _ = Describe("Job", func() {
}) })
}) })
func createJob(c *client.Client, ns string, restartPolicy api.RestartPolicy, timeout, parallelism, completions int) (*experimental.Job, error) { // newTestJob returns a job which does one of several testing behaviors.
name := "job-" + string(util.NewUUID()) func newTestJob(behavior, name string, rPol api.RestartPolicy, parallelism, completions int) *experimental.Job {
return c.Experimental().Jobs(ns).Create(&experimental.Job{ job := &experimental.Job{
ObjectMeta: api.ObjectMeta{ ObjectMeta: api.ObjectMeta{
Name: name, Name: name,
}, },
@ -199,26 +185,47 @@ func createJob(c *client.Client, ns string, restartPolicy api.RestartPolicy, tim
Completions: &completions, Completions: &completions,
Template: &api.PodTemplateSpec{ Template: &api.PodTemplateSpec{
ObjectMeta: api.ObjectMeta{ ObjectMeta: api.ObjectMeta{
Labels: map[string]string{jobSelector: name}, Labels: map[string]string{jobSelectorKey: name},
}, },
Spec: api.PodSpec{ Spec: api.PodSpec{
RestartPolicy: restartPolicy, RestartPolicy: rPol,
Containers: []api.Container{ Containers: []api.Container{
{ {
Name: name, Name: "c",
Image: "gcr.io/google_containers/busybox", Image: "gcr.io/google_containers/busybox",
Command: []string{"sleep", strconv.Itoa(timeout)}, Command: []string{},
}, },
}, },
}, },
}, },
}, },
}) }
switch behavior {
case "notTerminate":
job.Spec.Template.Spec.Containers[0].Command = []string{"sleep", "1000000"}
case "fail":
job.Spec.Template.Spec.Containers[0].Command = []string{"/bin/sh", "-c", "exit 1"}
case "succeed":
job.Spec.Template.Spec.Containers[0].Command = []string{"/bin/sh", "-c", "exit 0"}
case "randomlySucceedOrFail":
// Bash's $RANDOM generates pseudorandom int in range 0 - 32767.
// Dividing by 16384 gives roughly 50/50 chance of succeess.
job.Spec.Template.Spec.Containers[0].Command = []string{"/bin/sh", "-c", "exit $(( $RANDOM / 16384 ))"}
}
return job
} }
// Wait for pods to become Running. func createJob(c *client.Client, ns string, job *experimental.Job) (*experimental.Job, error) {
func waitForJobRunning(c *client.Client, ns, jobName string, parallelism int) error { return c.Experimental().Jobs(ns).Create(job)
label := labels.SelectorFromSet(labels.Set(map[string]string{jobSelector: jobName})) }
func deleteJob(c *client.Client, ns, name string) error {
return c.Experimental().Jobs(ns).Delete(name, api.NewDeleteOptions(0))
}
// Wait for all pods to become Running. Only use when pods will run for a long time, or it will be racy.
func waitForAllPodsRunning(c *client.Client, ns, jobName string, parallelism int) error {
label := labels.SelectorFromSet(labels.Set(map[string]string{jobSelectorKey: jobName}))
return wait.Poll(poll, jobTimeout, func() (bool, error) { return wait.Poll(poll, jobTimeout, func() (bool, error) {
pods, err := c.Pods(ns).List(label, fields.Everything()) pods, err := c.Pods(ns).List(label, fields.Everything())
if err != nil { if err != nil {