Speed up job's e2e when waiting for failure

Job controller synchronizes objects only when job itself or underlying pod
changes. Or, when full resync is performed once 10 mins. This leads e2e test
to unnecessarily wait that longer timeout, sometimes at least. I've added job
modification action which triggers resync, if the job wasn't terminated within
shorter period of time.
This commit is contained in:
Maciej Szulik 2016-09-02 15:47:01 +02:00
parent 2fe1fcdab8
commit 393d551332
2 changed files with 49 additions and 11 deletions

View File

@ -107,7 +107,7 @@ var _ = framework.KubeDescribe("V1Job", func() {
By("Ensuring job shows many failures") By("Ensuring job shows many failures")
err = wait.Poll(framework.Poll, v1JobTimeout, func() (bool, error) { err = wait.Poll(framework.Poll, v1JobTimeout, func() (bool, error) {
curr, err := f.Client.Batch().Jobs(f.Namespace.Name).Get(job.Name) curr, err := getV1Job(f.Client, f.Namespace.Name, job.Name)
if err != nil { if err != nil {
return false, err return false, err
} }
@ -183,12 +183,12 @@ var _ = framework.KubeDescribe("V1Job", func() {
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
By("Ensuring job was deleted") By("Ensuring job was deleted")
_, err = f.Client.Batch().Jobs(f.Namespace.Name).Get(job.Name) _, err = getV1Job(f.Client, f.Namespace.Name, job.Name)
Expect(err).To(HaveOccurred()) Expect(err).To(HaveOccurred())
Expect(errors.IsNotFound(err)).To(BeTrue()) Expect(errors.IsNotFound(err)).To(BeTrue())
}) })
It("should fail a job [Slow]", func() { It("should fail a job", func() {
By("Creating a job") By("Creating a job")
job := newTestV1Job("notTerminate", "foo", api.RestartPolicyNever, parallelism, completions) job := newTestV1Job("notTerminate", "foo", api.RestartPolicyNever, parallelism, completions)
activeDeadlineSeconds := int64(10) activeDeadlineSeconds := int64(10)
@ -197,7 +197,18 @@ var _ = framework.KubeDescribe("V1Job", func() {
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
By("Ensuring job was failed") By("Ensuring job was failed")
err = waitForV1JobFail(f.Client, f.Namespace.Name, job.Name) err = waitForV1JobFail(f.Client, f.Namespace.Name, job.Name, 20*time.Second)
if err == wait.ErrWaitTimeout {
job, err = getV1Job(f.Client, f.Namespace.Name, job.Name)
Expect(err).NotTo(HaveOccurred())
// the job stabilized and won't be synced until modification or full
// resync happens, we don't want to wait for the latter so we force
// sync modifying it
job.Spec.Parallelism = &completions
job, err = updateV1Job(f.Client, f.Namespace.Name, job)
Expect(err).NotTo(HaveOccurred())
err = waitForV1JobFail(f.Client, f.Namespace.Name, job.Name, v1JobTimeout)
}
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
}) })
}) })
@ -264,10 +275,18 @@ func newTestV1Job(behavior, name string, rPol api.RestartPolicy, parallelism, co
return job return job
} }
func getV1Job(c *client.Client, ns, name string) (*batch.Job, error) {
return c.Batch().Jobs(ns).Get(name)
}
func createV1Job(c *client.Client, ns string, job *batch.Job) (*batch.Job, error) { func createV1Job(c *client.Client, ns string, job *batch.Job) (*batch.Job, error) {
return c.Batch().Jobs(ns).Create(job) return c.Batch().Jobs(ns).Create(job)
} }
func updateV1Job(c *client.Client, ns string, job *batch.Job) (*batch.Job, error) {
return c.Batch().Jobs(ns).Update(job)
}
func deleteV1Job(c *client.Client, ns, name string) error { func deleteV1Job(c *client.Client, ns, name string) error {
return c.Batch().Jobs(ns).Delete(name, api.NewDeleteOptions(0)) return c.Batch().Jobs(ns).Delete(name, api.NewDeleteOptions(0))
} }
@ -303,8 +322,8 @@ func waitForV1JobFinish(c *client.Client, ns, jobName string, completions int32)
} }
// Wait for job fail. // Wait for job fail.
func waitForV1JobFail(c *client.Client, ns, jobName string) error { func waitForV1JobFail(c *client.Client, ns, jobName string, timeout time.Duration) error {
return wait.Poll(framework.Poll, v1JobTimeout, func() (bool, error) { return wait.Poll(framework.Poll, timeout, func() (bool, error) {
curr, err := c.Batch().Jobs(ns).Get(jobName) curr, err := c.Batch().Jobs(ns).Get(jobName)
if err != nil { if err != nil {
return false, err return false, err

View File

@ -103,7 +103,7 @@ var _ = framework.KubeDescribe("Job", func() {
By("Ensuring job shows many failures") By("Ensuring job shows many failures")
err = wait.Poll(framework.Poll, jobTimeout, func() (bool, error) { err = wait.Poll(framework.Poll, jobTimeout, func() (bool, error) {
curr, err := f.Client.Extensions().Jobs(f.Namespace.Name).Get(job.Name) curr, err := getJob(f.Client, f.Namespace.Name, job.Name)
if err != nil { if err != nil {
return false, err return false, err
} }
@ -179,7 +179,7 @@ var _ = framework.KubeDescribe("Job", func() {
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
By("Ensuring job was deleted") By("Ensuring job was deleted")
_, err = f.Client.Extensions().Jobs(f.Namespace.Name).Get(job.Name) _, err = getJob(f.Client, f.Namespace.Name, job.Name)
Expect(err).To(HaveOccurred()) Expect(err).To(HaveOccurred())
Expect(errors.IsNotFound(err)).To(BeTrue()) Expect(errors.IsNotFound(err)).To(BeTrue())
}) })
@ -193,7 +193,18 @@ var _ = framework.KubeDescribe("Job", func() {
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
By("Ensuring job was failed") By("Ensuring job was failed")
err = waitForJobFail(f.Client, f.Namespace.Name, job.Name) err = waitForJobFail(f.Client, f.Namespace.Name, job.Name, 20*time.Second)
if err == wait.ErrWaitTimeout {
job, err = getJob(f.Client, f.Namespace.Name, job.Name)
Expect(err).NotTo(HaveOccurred())
// the job stabilized and won't be synced until modification or full
// resync happens, we don't want to wait for the latter so we force
// sync modifying it
job.Spec.Parallelism = &completions
job, err = updateJob(f.Client, f.Namespace.Name, job)
Expect(err).NotTo(HaveOccurred())
err = waitForJobFail(f.Client, f.Namespace.Name, job.Name, jobTimeout)
}
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
}) })
}) })
@ -261,10 +272,18 @@ func newTestJob(behavior, name string, rPol api.RestartPolicy, parallelism, comp
return job return job
} }
func getJob(c *client.Client, ns, name string) (*batch.Job, error) {
return c.Extensions().Jobs(ns).Get(name)
}
func createJob(c *client.Client, ns string, job *batch.Job) (*batch.Job, error) { func createJob(c *client.Client, ns string, job *batch.Job) (*batch.Job, error) {
return c.Extensions().Jobs(ns).Create(job) return c.Extensions().Jobs(ns).Create(job)
} }
func updateJob(c *client.Client, ns string, job *batch.Job) (*batch.Job, error) {
return c.Extensions().Jobs(ns).Update(job)
}
func deleteJob(c *client.Client, ns, name string) error { func deleteJob(c *client.Client, ns, name string) error {
return c.Extensions().Jobs(ns).Delete(name, nil) return c.Extensions().Jobs(ns).Delete(name, nil)
} }
@ -300,8 +319,8 @@ func waitForJobFinish(c *client.Client, ns, jobName string, completions int32) e
} }
// Wait for job fail. // Wait for job fail.
func waitForJobFail(c *client.Client, ns, jobName string) error { func waitForJobFail(c *client.Client, ns, jobName string, timeout time.Duration) error {
return wait.Poll(framework.Poll, jobTimeout, func() (bool, error) { return wait.Poll(framework.Poll, timeout, func() (bool, error) {
curr, err := c.Extensions().Jobs(ns).Get(jobName) curr, err := c.Extensions().Jobs(ns).Get(jobName)
if err != nil { if err != nil {
return false, err return false, err