diff --git a/pkg/controller/job/jobcontroller.go b/pkg/controller/job/jobcontroller.go index d818f0e85b4..6c0bd30fcfa 100644 --- a/pkg/controller/job/jobcontroller.go +++ b/pkg/controller/job/jobcontroller.go @@ -429,6 +429,8 @@ func (jm *JobController) syncJob(key string) error { if IsJobFinished(&job) { return nil } + + var manageJobErr error if pastActiveDeadline(&job) { // TODO: below code should be replaced with pod termination resulting in // pod failures, rather than killing pods. Unfortunately none such solution @@ -437,16 +439,28 @@ func (jm *JobController) syncJob(key string) error { // some sort of solution to above problem. // kill remaining active pods wait := sync.WaitGroup{} + errCh := make(chan error, int(active)) wait.Add(int(active)) for i := int32(0); i < active; i++ { go func(ix int32) { defer wait.Done() if err := jm.podControl.DeletePod(job.Namespace, activePods[ix].Name, &job); err != nil { defer utilruntime.HandleError(err) + glog.V(2).Infof("Failed to delete %v, job %q/%q deadline exceeded", activePods[ix].Name, job.Namespace, job.Name) + errCh <- err } }(i) } wait.Wait() + + select { + case manageJobErr = <-errCh: + if manageJobErr != nil { + break + } + default: + } + // update status values accordingly failed += active active = 0 @@ -454,7 +468,7 @@ func (jm *JobController) syncJob(key string) error { jm.recorder.Event(&job, v1.EventTypeNormal, "DeadlineExceeded", "Job was active longer than specified deadline") } else { if jobNeedsSync && job.DeletionTimestamp == nil { - active = jm.manageJob(activePods, succeeded, &job) + active, manageJobErr = jm.manageJob(activePods, succeeded, &job) } completions := succeeded complete := false @@ -500,7 +514,7 @@ func (jm *JobController) syncJob(key string) error { return err } } - return nil + return manageJobErr } // pastActiveDeadline checks if job has ActiveDeadlineSeconds field set and if it is exceeded. @@ -536,18 +550,20 @@ func getStatus(pods []*v1.Pod) (succeeded, failed int32) { // manageJob is the core method responsible for managing the number of running // pods according to what is specified in the job.Spec. // Does NOT modify . -func (jm *JobController) manageJob(activePods []*v1.Pod, succeeded int32, job *batch.Job) int32 { +func (jm *JobController) manageJob(activePods []*v1.Pod, succeeded int32, job *batch.Job) (int32, error) { var activeLock sync.Mutex active := int32(len(activePods)) parallelism := *job.Spec.Parallelism jobKey, err := controller.KeyFunc(job) if err != nil { utilruntime.HandleError(fmt.Errorf("Couldn't get key for job %#v: %v", job, err)) - return 0 + return 0, nil } + var errCh chan error if active > parallelism { diff := active - parallelism + errCh = make(chan error, diff) jm.expectations.ExpectDeletions(jobKey, int(diff)) glog.V(4).Infof("Too many pods running job %q, need %d, deleting %d", jobKey, parallelism, diff) // Sort the pods in the order such that not-ready < ready, unscheduled @@ -564,10 +580,12 @@ func (jm *JobController) manageJob(activePods []*v1.Pod, succeeded int32, job *b if err := jm.podControl.DeletePod(job.Namespace, activePods[ix].Name, job); err != nil { defer utilruntime.HandleError(err) // Decrement the expected number of deletes because the informer won't observe this deletion + glog.V(2).Infof("Failed to delete %v, decrementing expectations for job %q/%q", activePods[ix].Name, job.Namespace, job.Name) jm.expectations.DeletionObserved(jobKey) activeLock.Lock() active++ activeLock.Unlock() + errCh <- err } }(i) } @@ -598,6 +616,7 @@ func (jm *JobController) manageJob(activePods []*v1.Pod, succeeded int32, job *b diff = 0 } jm.expectations.ExpectCreations(jobKey, int(diff)) + errCh = make(chan error, diff) glog.V(4).Infof("Too few pods running job %q, need %d, creating %d", jobKey, wantActive, diff) active += diff @@ -609,17 +628,28 @@ func (jm *JobController) manageJob(activePods []*v1.Pod, succeeded int32, job *b if err := jm.podControl.CreatePodsWithControllerRef(job.Namespace, &job.Spec.Template, job, newControllerRef(job)); err != nil { defer utilruntime.HandleError(err) // Decrement the expected number of creates because the informer won't observe this pod + glog.V(2).Infof("Failed creation, decrementing expectations for job %q/%q", job.Namespace, job.Name) jm.expectations.CreationObserved(jobKey) activeLock.Lock() active-- activeLock.Unlock() + errCh <- err } }() } wait.Wait() } - return active + select { + case err := <-errCh: + // all errors have been reported before, we only need to inform the controller that there was an error and it should re-try this job once more next time. + if err != nil { + return active, err + } + default: + } + + return active, nil } func (jm *JobController) updateJobStatus(job *batch.Job) error { diff --git a/pkg/controller/job/jobcontroller_test.go b/pkg/controller/job/jobcontroller_test.go index 0eadda6f14c..206d3816b14 100644 --- a/pkg/controller/job/jobcontroller_test.go +++ b/pkg/controller/job/jobcontroller_test.go @@ -268,8 +268,16 @@ func TestControllerSyncJob(t *testing.T) { // run err := manager.syncJob(getKey(job, t)) - if err != nil { - t.Errorf("%s: unexpected error when syncing jobs %v", name, err) + + // We need requeue syncJob task if podController error + if tc.podControllerError != nil { + if err == nil { + t.Errorf("%s: Syncing jobs would return error when podController exception", name) + } + } else { + if err != nil { + t.Errorf("%s: unexpected error when syncing jobs %v", name, err) + } } // validate created/deleted pods