Merge pull request #45664 from tacy/fix45213

Automatic merge from submit-queue (batch tested with PRs 45664, 45861)

Fix #45213: Syncing jobs would return error when podController exception

**What this PR does / why we need it**:
Jobcontroller:  Syncing jobs would return error when podController exception
**Which issue this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close that issue when PR gets merged)*: fixes #
fixes #45213 
**Special notes for your reviewer**:

**Release note**:

```release-note
```
This commit is contained in:
Kubernetes Submit Queue 2017-05-15 21:08:48 -07:00 committed by GitHub
commit 746f5d6a28
2 changed files with 45 additions and 7 deletions

View File

@ -429,6 +429,8 @@ func (jm *JobController) syncJob(key string) error {
if IsJobFinished(&job) {
return nil
}
var manageJobErr error
if pastActiveDeadline(&job) {
// TODO: below code should be replaced with pod termination resulting in
// pod failures, rather than killing pods. Unfortunately none such solution
@ -437,16 +439,28 @@ func (jm *JobController) syncJob(key string) error {
// some sort of solution to above problem.
// kill remaining active pods
wait := sync.WaitGroup{}
errCh := make(chan error, int(active))
wait.Add(int(active))
for i := int32(0); i < active; i++ {
go func(ix int32) {
defer wait.Done()
if err := jm.podControl.DeletePod(job.Namespace, activePods[ix].Name, &job); err != nil {
defer utilruntime.HandleError(err)
glog.V(2).Infof("Failed to delete %v, job %q/%q deadline exceeded", activePods[ix].Name, job.Namespace, job.Name)
errCh <- err
}
}(i)
}
wait.Wait()
select {
case manageJobErr = <-errCh:
if manageJobErr != nil {
break
}
default:
}
// update status values accordingly
failed += active
active = 0
@ -454,7 +468,7 @@ func (jm *JobController) syncJob(key string) error {
jm.recorder.Event(&job, v1.EventTypeNormal, "DeadlineExceeded", "Job was active longer than specified deadline")
} else {
if jobNeedsSync && job.DeletionTimestamp == nil {
active = jm.manageJob(activePods, succeeded, &job)
active, manageJobErr = jm.manageJob(activePods, succeeded, &job)
}
completions := succeeded
complete := false
@ -500,7 +514,7 @@ func (jm *JobController) syncJob(key string) error {
return err
}
}
return nil
return manageJobErr
}
// pastActiveDeadline checks if job has ActiveDeadlineSeconds field set and if it is exceeded.
@ -536,18 +550,20 @@ func getStatus(pods []*v1.Pod) (succeeded, failed int32) {
// manageJob is the core method responsible for managing the number of running
// pods according to what is specified in the job.Spec.
// Does NOT modify <activePods>.
func (jm *JobController) manageJob(activePods []*v1.Pod, succeeded int32, job *batch.Job) int32 {
func (jm *JobController) manageJob(activePods []*v1.Pod, succeeded int32, job *batch.Job) (int32, error) {
var activeLock sync.Mutex
active := int32(len(activePods))
parallelism := *job.Spec.Parallelism
jobKey, err := controller.KeyFunc(job)
if err != nil {
utilruntime.HandleError(fmt.Errorf("Couldn't get key for job %#v: %v", job, err))
return 0
return 0, nil
}
var errCh chan error
if active > parallelism {
diff := active - parallelism
errCh = make(chan error, diff)
jm.expectations.ExpectDeletions(jobKey, int(diff))
glog.V(4).Infof("Too many pods running job %q, need %d, deleting %d", jobKey, parallelism, diff)
// Sort the pods in the order such that not-ready < ready, unscheduled
@ -564,10 +580,12 @@ func (jm *JobController) manageJob(activePods []*v1.Pod, succeeded int32, job *b
if err := jm.podControl.DeletePod(job.Namespace, activePods[ix].Name, job); err != nil {
defer utilruntime.HandleError(err)
// Decrement the expected number of deletes because the informer won't observe this deletion
glog.V(2).Infof("Failed to delete %v, decrementing expectations for job %q/%q", activePods[ix].Name, job.Namespace, job.Name)
jm.expectations.DeletionObserved(jobKey)
activeLock.Lock()
active++
activeLock.Unlock()
errCh <- err
}
}(i)
}
@ -598,6 +616,7 @@ func (jm *JobController) manageJob(activePods []*v1.Pod, succeeded int32, job *b
diff = 0
}
jm.expectations.ExpectCreations(jobKey, int(diff))
errCh = make(chan error, diff)
glog.V(4).Infof("Too few pods running job %q, need %d, creating %d", jobKey, wantActive, diff)
active += diff
@ -609,17 +628,28 @@ func (jm *JobController) manageJob(activePods []*v1.Pod, succeeded int32, job *b
if err := jm.podControl.CreatePodsWithControllerRef(job.Namespace, &job.Spec.Template, job, newControllerRef(job)); err != nil {
defer utilruntime.HandleError(err)
// Decrement the expected number of creates because the informer won't observe this pod
glog.V(2).Infof("Failed creation, decrementing expectations for job %q/%q", job.Namespace, job.Name)
jm.expectations.CreationObserved(jobKey)
activeLock.Lock()
active--
activeLock.Unlock()
errCh <- err
}
}()
}
wait.Wait()
}
return active
select {
case err := <-errCh:
// all errors have been reported before, we only need to inform the controller that there was an error and it should re-try this job once more next time.
if err != nil {
return active, err
}
default:
}
return active, nil
}
func (jm *JobController) updateJobStatus(job *batch.Job) error {

View File

@ -268,8 +268,16 @@ func TestControllerSyncJob(t *testing.T) {
// run
err := manager.syncJob(getKey(job, t))
if err != nil {
t.Errorf("%s: unexpected error when syncing jobs %v", name, err)
// We need requeue syncJob task if podController error
if tc.podControllerError != nil {
if err == nil {
t.Errorf("%s: Syncing jobs would return error when podController exception", name)
}
} else {
if err != nil {
t.Errorf("%s: unexpected error when syncing jobs %v", name, err)
}
}
// validate created/deleted pods