Added ActiveDeadlineSeconds to jobs, allowing failing a job after

exceeding allowed time.
This commit is contained in:
Maciej Szulik
2015-11-26 16:54:04 +01:00
parent efc821a14c
commit 327c104460
21 changed files with 16070 additions and 14806 deletions

View File

@@ -64,6 +64,8 @@ type JobController struct {
// Jobs that need to be updated
queue *workqueue.Type
recorder record.EventRecorder
}
func NewJobController(kubeClient client.Interface, resyncPeriod controller.ResyncPeriodFunc) *JobController {
@@ -75,10 +77,11 @@ func NewJobController(kubeClient client.Interface, resyncPeriod controller.Resyn
kubeClient: kubeClient,
podControl: controller.RealPodControl{
KubeClient: kubeClient,
Recorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "job"}),
Recorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "job-controller"}),
},
expectations: controller.NewControllerExpectations(),
queue: workqueue.New(),
recorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "job-controller"}),
}
jm.jobStore.Store, jm.jobController = framework.NewInformer(
@@ -322,16 +325,52 @@ func (jm *JobController) syncJob(key string) error {
activePods := controller.FilterActivePods(podList.Items)
active := len(activePods)
succeeded, failed := getStatus(podList.Items)
if jobNeedsSync {
active = jm.manageJob(activePods, succeeded, &job)
conditions := len(job.Status.Conditions)
if job.Status.StartTime == nil {
now := unversioned.Now()
job.Status.StartTime = &now
}
completions := succeeded
if completions == *job.Spec.Completions {
job.Status.Conditions = append(job.Status.Conditions, newCondition())
if pastActiveDeadline(&job) {
// if job was finished previously, we don't want to redo the termination
if isJobFinished(&job) {
return nil
}
// TODO: below code should be replaced with pod termination resulting in
// pod failures, rather than killing pods. Unfortunately none such solution
// exists ATM. There's an open discussion in the topic in
// https://github.com/kubernetes/kubernetes/issues/14602 which might give
// some sort of solution to above problem.
// kill remaining active pods
wait := sync.WaitGroup{}
wait.Add(active)
for i := 0; i < active; i++ {
go func(ix int) {
defer wait.Done()
if err := jm.podControl.DeletePod(job.Namespace, activePods[ix].Name); err != nil {
defer util.HandleError(err)
}
}(i)
}
wait.Wait()
// update status values accordingly
failed += active
active = 0
job.Status.Conditions = append(job.Status.Conditions, newCondition(extensions.JobFailed, "DeadlineExceeded", "Job was active longer than specified deadline"))
jm.recorder.Event(&job, api.EventTypeNormal, "DeadlineExceeded", "Job was active longer than specified deadline")
} else {
if jobNeedsSync {
active = jm.manageJob(activePods, succeeded, &job)
}
completions := succeeded
if completions == *job.Spec.Completions {
job.Status.Conditions = append(job.Status.Conditions, newCondition(extensions.JobComplete, "", ""))
now := unversioned.Now()
job.Status.CompletionTime = &now
}
}
// no need to update the job if the status hasn't changed since last time
if job.Status.Active != active || job.Status.Succeeded != succeeded || job.Status.Failed != failed {
if job.Status.Active != active || job.Status.Succeeded != succeeded || job.Status.Failed != failed || len(job.Status.Conditions) != conditions {
job.Status.Active = active
job.Status.Succeeded = succeeded
job.Status.Failed = failed
@@ -344,21 +383,38 @@ func (jm *JobController) syncJob(key string) error {
return nil
}
func newCondition() extensions.JobCondition {
// pastActiveDeadline checks if job has ActiveDeadlineSeconds field set and if it is exceeded.
func pastActiveDeadline(job *extensions.Job) bool {
if job.Spec.ActiveDeadlineSeconds == nil || job.Status.StartTime == nil {
return false
}
now := unversioned.Now()
start := job.Status.StartTime.Time
duration := now.Time.Sub(start)
allowedDuration := time.Duration(*job.Spec.ActiveDeadlineSeconds) * time.Second
return duration >= allowedDuration
}
func newCondition(conditionType extensions.JobConditionType, reason, message string) extensions.JobCondition {
return extensions.JobCondition{
Type: extensions.JobComplete,
Type: conditionType,
Status: api.ConditionTrue,
LastProbeTime: unversioned.Now(),
LastTransitionTime: unversioned.Now(),
Reason: reason,
Message: message,
}
}
// getStatus returns no of succeeded and failed pods running a job
func getStatus(pods []api.Pod) (succeeded, failed int) {
succeeded = filterPods(pods, api.PodSucceeded)
failed = filterPods(pods, api.PodFailed)
return
}
// manageJob is the core method responsible for managing the number of running
// pods according to what is specified in the job.Spec.
func (jm *JobController) manageJob(activePods []*api.Pod, succeeded int, job *extensions.Job) int {
var activeLock sync.Mutex
active := len(activePods)
@@ -447,7 +503,7 @@ func filterPods(pods []api.Pod, phase api.PodPhase) int {
func isJobFinished(j *extensions.Job) bool {
for _, c := range j.Status.Conditions {
if c.Type == extensions.JobComplete && c.Status == api.ConditionTrue {
if (c.Type == extensions.JobComplete || c.Type == extensions.JobFailed) && c.Status == api.ConditionTrue {
return true
}
}