diff --git a/pkg/controller/job/job_controller.go b/pkg/controller/job/job_controller.go index 2a239a70327..d1e24fc5ac8 100644 --- a/pkg/controller/job/job_controller.go +++ b/pkg/controller/job/job_controller.go @@ -82,10 +82,10 @@ func NewJobController(podInformer coreinformers.PodInformer, jobInformer batchin eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartLogging(glog.Infof) // TODO: remove the wrapper when every clients have moved to use the clientset. - eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: v1core.New(kubeClient.Core().RESTClient()).Events("")}) + eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: v1core.New(kubeClient.CoreV1().RESTClient()).Events("")}) - if kubeClient != nil && kubeClient.Core().RESTClient().GetRateLimiter() != nil { - metrics.RegisterMetricAndTrackRateLimiterUsage("job_controller", kubeClient.Core().RESTClient().GetRateLimiter()) + if kubeClient != nil && kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil { + metrics.RegisterMetricAndTrackRateLimiterUsage("job_controller", kubeClient.CoreV1().RESTClient().GetRateLimiter()) } jm := &JobController{ @@ -100,12 +100,8 @@ func NewJobController(podInformer coreinformers.PodInformer, jobInformer batchin } jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: jm.enqueueController, - UpdateFunc: func(old, cur interface{}) { - if job := cur.(*batch.Job); !IsJobFinished(job) { - jm.enqueueController(job) - } - }, + AddFunc: jm.enqueueController, + UpdateFunc: jm.updateJob, DeleteFunc: jm.enqueueController, }) jm.jobLister = jobInformer.Lister() @@ -306,6 +302,35 @@ func (jm *JobController) deletePod(obj interface{}) { jm.enqueueController(job) } +func (jm *JobController) updateJob(old, cur interface{}) { + oldJob := cur.(*batch.Job) + curJob := cur.(*batch.Job) + + // never return error + key, err := controller.KeyFunc(curJob) + if err != nil { + return + } + jm.queue.Add(key) + // check if need to add a new rsync for ActiveDeadlineSeconds + if curJob.Status.StartTime != nil { + curADS := curJob.Spec.ActiveDeadlineSeconds + if curADS == nil { + return + } + oldADS := oldJob.Spec.ActiveDeadlineSeconds + if oldADS == nil || *oldADS != *curADS { + now := metav1.Now() + start := curJob.Status.StartTime.Time + passed := now.Time.Sub(start) + total := time.Duration(*curADS) * time.Second + // AddAfter will handle total < passed + jm.queue.AddAfter(key, total-passed) + glog.V(4).Infof("job ActiveDeadlineSeconds updated, will rsync after %d seconds", total-passed) + } + } +} + // obj could be an *batch.Job, or a DeletionFinalStateUnknown marker item. func (jm *JobController) enqueueController(obj interface{}) { key, err := controller.KeyFunc(obj) @@ -420,9 +445,16 @@ func (jm *JobController) syncJob(key string) error { active := int32(len(activePods)) succeeded, failed := getStatus(pods) conditions := len(job.Status.Conditions) + // job first start if job.Status.StartTime == nil { now := metav1.Now() job.Status.StartTime = &now + // enqueue a sync to check if job past ActiveDeadlineSeconds + if job.Spec.ActiveDeadlineSeconds != nil { + glog.V(4).Infof("Job %s have ActiveDeadlineSeconds will sync after %d seconds", + key, *job.Spec.ActiveDeadlineSeconds) + jm.queue.AddAfter(key, time.Duration(*job.Spec.ActiveDeadlineSeconds)*time.Second) + } } // if job was finished previously, we don't want to redo the termination if IsJobFinished(&job) { diff --git a/test/e2e/apps/job.go b/test/e2e/apps/job.go index 7180f9a7ad6..939c11ce7c6 100644 --- a/test/e2e/apps/job.go +++ b/test/e2e/apps/job.go @@ -39,7 +39,7 @@ var _ = SIGDescribe("Job", func() { // Simplest case: all pods succeed promptly It("should run a job to completion when tasks succeed", func() { By("Creating a job") - job := framework.NewTestJob("succeed", "all-succeed", v1.RestartPolicyNever, parallelism, completions) + job := framework.NewTestJob("succeed", "all-succeed", v1.RestartPolicyNever, parallelism, completions, nil) job, err := framework.CreateJob(f.ClientSet, f.Namespace.Name, job) Expect(err).NotTo(HaveOccurred()) @@ -58,7 +58,7 @@ var _ = SIGDescribe("Job", func() { // up to 5 minutes between restarts, making test timeouts // due to successive failures too likely with a reasonable // test timeout. - job := framework.NewTestJob("failOnce", "fail-once-local", v1.RestartPolicyOnFailure, parallelism, completions) + job := framework.NewTestJob("failOnce", "fail-once-local", v1.RestartPolicyOnFailure, parallelism, completions, nil) job, err := framework.CreateJob(f.ClientSet, f.Namespace.Name, job) Expect(err).NotTo(HaveOccurred()) @@ -76,7 +76,7 @@ var _ = SIGDescribe("Job", func() { // Worst case analysis: 15 failures, each taking 1 minute to // run due to some slowness, 1 in 2^15 chance of happening, // causing test flake. Should be very rare. - job := framework.NewTestJob("randomlySucceedOrFail", "rand-non-local", v1.RestartPolicyNever, parallelism, completions) + job := framework.NewTestJob("randomlySucceedOrFail", "rand-non-local", v1.RestartPolicyNever, parallelism, completions, nil) job, err := framework.CreateJob(f.ClientSet, f.Namespace.Name, job) Expect(err).NotTo(HaveOccurred()) @@ -85,9 +85,20 @@ var _ = SIGDescribe("Job", func() { Expect(err).NotTo(HaveOccurred()) }) + It("should exceed active deadline", func() { + By("Creating a job") + var activeDeadlineSeconds int64 = 1 + job := framework.NewTestJob("notTerminate", "exceed-active-deadline", v1.RestartPolicyNever, parallelism, completions, &activeDeadlineSeconds) + job, err := framework.CreateJob(f.ClientSet, f.Namespace.Name, job) + Expect(err).NotTo(HaveOccurred()) + By("Ensuring job past active deadline") + err = framework.WaitForJobFailure(f.ClientSet, f.Namespace.Name, job.Name, time.Duration(activeDeadlineSeconds+10)*time.Second, "DeadlineExceeded") + Expect(err).NotTo(HaveOccurred()) + }) + It("should delete a job", func() { By("Creating a job") - job := framework.NewTestJob("notTerminate", "foo", v1.RestartPolicyNever, parallelism, completions) + job := framework.NewTestJob("notTerminate", "foo", v1.RestartPolicyNever, parallelism, completions, nil) job, err := framework.CreateJob(f.ClientSet, f.Namespace.Name, job) Expect(err).NotTo(HaveOccurred()) @@ -110,7 +121,7 @@ var _ = SIGDescribe("Job", func() { It("should adopt matching orphans and release non-matching pods", func() { By("Creating a job") - job := framework.NewTestJob("notTerminate", "adopt-release", v1.RestartPolicyNever, parallelism, completions) + job := framework.NewTestJob("notTerminate", "adopt-release", v1.RestartPolicyNever, parallelism, completions, nil) // Replace job with the one returned from Create() so it has the UID. // Save Kind since it won't be populated in the returned job. kind := job.Kind diff --git a/test/e2e/framework/jobs_util.go b/test/e2e/framework/jobs_util.go index fa7fe71c016..778b507fc6b 100644 --- a/test/e2e/framework/jobs_util.go +++ b/test/e2e/framework/jobs_util.go @@ -43,7 +43,7 @@ const ( // first time it is run and succeeds subsequently. name is the Name of the Job. RestartPolicy indicates the restart // policy of the containers in which the Pod is running. Parallelism is the Job's parallelism, and completions is the // Job's required number of completions. -func NewTestJob(behavior, name string, rPol v1.RestartPolicy, parallelism, completions int32) *batch.Job { +func NewTestJob(behavior, name string, rPol v1.RestartPolicy, parallelism, completions int32, activeDeadlineSeconds *int64) *batch.Job { job := &batch.Job{ ObjectMeta: metav1.ObjectMeta{ Name: name, @@ -52,9 +52,10 @@ func NewTestJob(behavior, name string, rPol v1.RestartPolicy, parallelism, compl Kind: "Job", }, Spec: batch.JobSpec{ - Parallelism: ¶llelism, - Completions: &completions, - ManualSelector: newBool(false), + ActiveDeadlineSeconds: activeDeadlineSeconds, + Parallelism: ¶llelism, + Completions: &completions, + ManualSelector: newBool(false), Template: v1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{JobSelectorKey: name}, @@ -191,7 +192,7 @@ func WaitForJobFinish(c clientset.Interface, ns, jobName string, completions int } // WaitForJobFailure uses c to wait for up to timeout for the Job named jobName in namespace ns to fail. -func WaitForJobFailure(c clientset.Interface, ns, jobName string, timeout time.Duration) error { +func WaitForJobFailure(c clientset.Interface, ns, jobName string, timeout time.Duration, reason string) error { return wait.Poll(Poll, timeout, func() (bool, error) { curr, err := c.Batch().Jobs(ns).Get(jobName, metav1.GetOptions{}) if err != nil { @@ -199,7 +200,9 @@ func WaitForJobFailure(c clientset.Interface, ns, jobName string, timeout time.D } for _, c := range curr.Status.Conditions { if c.Type == batch.JobFailed && c.Status == v1.ConditionTrue { - return true, nil + if reason == "" || reason == c.Reason { + return true, nil + } } } return false, nil diff --git a/test/e2e/network_partition.go b/test/e2e/network_partition.go index 30f3b5f2514..978b3a23fa6 100644 --- a/test/e2e/network_partition.go +++ b/test/e2e/network_partition.go @@ -420,7 +420,7 @@ var _ = framework.KubeDescribe("[sig-apps] Network Partition [Disruptive] [Slow] completions := int32(4) job := framework.NewTestJob("notTerminate", "network-partition", v1.RestartPolicyNever, - parallelism, completions) + parallelism, completions, nil) job, err := framework.CreateJob(f.ClientSet, f.Namespace.Name, job) Expect(err).NotTo(HaveOccurred()) label := labels.SelectorFromSet(labels.Set(map[string]string{framework.JobSelectorKey: job.Name})) diff --git a/test/e2e/upgrades/apps/job.go b/test/e2e/upgrades/apps/job.go index 1fd2b1a28b7..da5793b9bb9 100644 --- a/test/e2e/upgrades/apps/job.go +++ b/test/e2e/upgrades/apps/job.go @@ -39,7 +39,7 @@ func (t *JobUpgradeTest) Setup(f *framework.Framework) { t.namespace = f.Namespace.Name By("Creating a job") - t.job = framework.NewTestJob("notTerminate", "foo", v1.RestartPolicyOnFailure, 2, 2) + t.job = framework.NewTestJob("notTerminate", "foo", v1.RestartPolicyOnFailure, 2, 2, nil) job, err := framework.CreateJob(f.ClientSet, t.namespace, t.job) t.job = job Expect(err).NotTo(HaveOccurred())