Merge pull request #107470 from jlsong01/optimize_cj_status_update

Optimize cronjob controller status updates
This commit is contained in:
Kubernetes Prow Robot 2022-02-01 00:14:18 -08:00 committed by GitHub
commit 954fa57f0e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 191 additions and 118 deletions

View File

@ -189,17 +189,29 @@ func (jm *ControllerV2) sync(ctx context.Context, cronJobKey string) (*time.Dura
return nil, err return nil, err
} }
cronJobCopy, requeueAfter, err := jm.syncCronJob(ctx, cronJob, jobsToBeReconciled) cronJobCopy, requeueAfter, updateStatus, err := jm.syncCronJob(ctx, cronJob, jobsToBeReconciled)
if err != nil { if err != nil {
klog.V(2).InfoS("Error reconciling cronjob", "cronjob", klog.KRef(cronJob.GetNamespace(), cronJob.GetName()), "err", err) klog.V(2).InfoS("Error reconciling cronjob", "cronjob", klog.KRef(cronJob.GetNamespace(), cronJob.GetName()), "err", err)
if updateStatus {
if _, err := jm.cronJobControl.UpdateStatus(ctx, cronJobCopy); err != nil {
klog.V(2).InfoS("Unable to update status for cronjob", "cronjob", klog.KRef(cronJob.GetNamespace(), cronJob.GetName()), "resourceVersion", cronJob.ResourceVersion, "err", err)
return nil, err
}
}
return nil, err return nil, err
} }
err = jm.cleanupFinishedJobs(ctx, cronJobCopy, jobsToBeReconciled) if jm.cleanupFinishedJobs(ctx, cronJobCopy, jobsToBeReconciled) {
if err != nil { updateStatus = true
klog.V(2).InfoS("Error cleaning up jobs", "cronjob", klog.KRef(cronJob.GetNamespace(), cronJob.GetName()), "resourceVersion", cronJob.GetResourceVersion(), "err", err) }
// Update the CronJob if needed
if updateStatus {
if _, err := jm.cronJobControl.UpdateStatus(ctx, cronJobCopy); err != nil {
klog.V(2).InfoS("Unable to update status for cronjob", "cronjob", klog.KRef(cronJob.GetNamespace(), cronJob.GetName()), "resourceVersion", cronJob.ResourceVersion, "err", err)
return nil, err return nil, err
} }
}
if requeueAfter != nil { if requeueAfter != nil {
klog.V(4).InfoS("Re-queuing cronjob", "cronjob", klog.KRef(cronJob.GetNamespace(), cronJob.GetName()), "requeueAfter", requeueAfter) klog.V(4).InfoS("Re-queuing cronjob", "cronjob", klog.KRef(cronJob.GetNamespace(), cronJob.GetName()), "requeueAfter", requeueAfter)
@ -399,47 +411,52 @@ func (jm *ControllerV2) updateCronJob(old interface{}, curr interface{}) {
} }
// syncCronJob reconciles a CronJob with a list of any Jobs that it created. // syncCronJob reconciles a CronJob with a list of any Jobs that it created.
// All known jobs created by "cj" should be included in "js". // All known jobs created by "cronJob" should be included in "jobs".
// The current time is passed in to facilitate testing. // The current time is passed in to facilitate testing.
// It returns a copy of the CronJob that is to be used by other functions // It returns a copy of the CronJob that is to be used by other functions
// that mutates the object // that mutates the object
// It also returns a bool to indicate an update to api-server is needed
func (jm *ControllerV2) syncCronJob( func (jm *ControllerV2) syncCronJob(
ctx context.Context, ctx context.Context,
cj *batchv1.CronJob, cronJob *batchv1.CronJob,
js []*batchv1.Job) (*batchv1.CronJob, *time.Duration, error) { jobs []*batchv1.Job) (*batchv1.CronJob, *time.Duration, bool, error) {
cj = cj.DeepCopy() cronJob = cronJob.DeepCopy()
now := jm.now() now := jm.now()
updateStatus := false
childrenJobs := make(map[types.UID]bool) childrenJobs := make(map[types.UID]bool)
for _, j := range js { for _, j := range jobs {
childrenJobs[j.ObjectMeta.UID] = true childrenJobs[j.ObjectMeta.UID] = true
found := inActiveList(*cj, j.ObjectMeta.UID) found := inActiveList(*cronJob, j.ObjectMeta.UID)
if !found && !IsJobFinished(j) { if !found && !IsJobFinished(j) {
cjCopy, err := jm.cronJobControl.GetCronJob(ctx, cj.Namespace, cj.Name) cjCopy, err := jm.cronJobControl.GetCronJob(ctx, cronJob.Namespace, cronJob.Name)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, updateStatus, err
} }
if inActiveList(*cjCopy, j.ObjectMeta.UID) { if inActiveList(*cjCopy, j.ObjectMeta.UID) {
cj = cjCopy cronJob = cjCopy
continue continue
} }
jm.recorder.Eventf(cj, corev1.EventTypeWarning, "UnexpectedJob", "Saw a job that the controller did not create or forgot: %s", j.Name) jm.recorder.Eventf(cronJob, corev1.EventTypeWarning, "UnexpectedJob", "Saw a job that the controller did not create or forgot: %s", j.Name)
// We found an unfinished job that has us as the parent, but it is not in our Active list. // We found an unfinished job that has us as the parent, but it is not in our Active list.
// This could happen if we crashed right after creating the Job and before updating the status, // This could happen if we crashed right after creating the Job and before updating the status,
// or if our jobs list is newer than our cj status after a relist, or if someone intentionally created // or if our jobs list is newer than our cj status after a relist, or if someone intentionally created
// a job that they wanted us to adopt. // a job that they wanted us to adopt.
} else if found && IsJobFinished(j) { } else if found && IsJobFinished(j) {
_, status := getFinishedStatus(j) _, status := getFinishedStatus(j)
deleteFromActiveList(cj, j.ObjectMeta.UID) deleteFromActiveList(cronJob, j.ObjectMeta.UID)
jm.recorder.Eventf(cj, corev1.EventTypeNormal, "SawCompletedJob", "Saw completed job: %s, status: %v", j.Name, status) jm.recorder.Eventf(cronJob, corev1.EventTypeNormal, "SawCompletedJob", "Saw completed job: %s, status: %v", j.Name, status)
updateStatus = true
} else if IsJobFinished(j) { } else if IsJobFinished(j) {
// a job does not have to be in active list, as long as it is finished, we will process the timestamp // a job does not have to be in active list, as long as it is finished, we will process the timestamp
if cj.Status.LastSuccessfulTime == nil { if cronJob.Status.LastSuccessfulTime == nil {
cj.Status.LastSuccessfulTime = j.Status.CompletionTime cronJob.Status.LastSuccessfulTime = j.Status.CompletionTime
updateStatus = true
} }
if j.Status.CompletionTime != nil && j.Status.CompletionTime.After(cj.Status.LastSuccessfulTime.Time) { if j.Status.CompletionTime != nil && j.Status.CompletionTime.After(cronJob.Status.LastSuccessfulTime.Time) {
cj.Status.LastSuccessfulTime = j.Status.CompletionTime cronJob.Status.LastSuccessfulTime = j.Status.CompletionTime
updateStatus = true
} }
} }
} }
@ -447,7 +464,7 @@ func (jm *ControllerV2) syncCronJob(
// Remove any job reference from the active list if the corresponding job does not exist any more. // Remove any job reference from the active list if the corresponding job does not exist any more.
// Otherwise, the cronjob may be stuck in active mode forever even though there is no matching // Otherwise, the cronjob may be stuck in active mode forever even though there is no matching
// job running. // job running.
for _, j := range cj.Status.Active { for _, j := range cronJob.Status.Active {
_, found := childrenJobs[j.UID] _, found := childrenJobs[j.UID]
if found { if found {
continue continue
@ -459,70 +476,64 @@ func (jm *ControllerV2) syncCronJob(
case errors.IsNotFound(err): case errors.IsNotFound(err):
// The job is actually missing, delete from active list and schedule a new one if within // The job is actually missing, delete from active list and schedule a new one if within
// deadline // deadline
jm.recorder.Eventf(cj, corev1.EventTypeNormal, "MissingJob", "Active job went missing: %v", j.Name) jm.recorder.Eventf(cronJob, corev1.EventTypeNormal, "MissingJob", "Active job went missing: %v", j.Name)
deleteFromActiveList(cj, j.UID) deleteFromActiveList(cronJob, j.UID)
updateStatus = true
case err != nil: case err != nil:
return cj, nil, err return cronJob, nil, updateStatus, err
} }
// the job is missing in the lister but found in api-server // the job is missing in the lister but found in api-server
} }
updatedCJ, err := jm.cronJobControl.UpdateStatus(ctx, cj) if cronJob.DeletionTimestamp != nil {
if err != nil {
klog.V(2).InfoS("Unable to update status for cronjob", "cronjob", klog.KRef(cj.GetNamespace(), cj.GetName()), "resourceVersion", cj.ResourceVersion, "err", err)
return cj, nil, err
}
*cj = *updatedCJ
if cj.DeletionTimestamp != nil {
// The CronJob is being deleted. // The CronJob is being deleted.
// Don't do anything other than updating status. // Don't do anything other than updating status.
return cj, nil, nil return cronJob, nil, updateStatus, nil
} }
if cj.Spec.Suspend != nil && *cj.Spec.Suspend { if cronJob.Spec.Suspend != nil && *cronJob.Spec.Suspend {
klog.V(4).InfoS("Not starting job because the cron is suspended", "cronjob", klog.KRef(cj.GetNamespace(), cj.GetName())) klog.V(4).InfoS("Not starting job because the cron is suspended", "cronjob", klog.KRef(cronJob.GetNamespace(), cronJob.GetName()))
return cj, nil, nil return cronJob, nil, updateStatus, nil
} }
sched, err := cron.ParseStandard(cj.Spec.Schedule) sched, err := cron.ParseStandard(cronJob.Spec.Schedule)
if err != nil { if err != nil {
// this is likely a user error in defining the spec value // this is likely a user error in defining the spec value
// we should log the error and not reconcile this cronjob until an update to spec // we should log the error and not reconcile this cronjob until an update to spec
klog.V(2).InfoS("Unparseable schedule", "cronjob", klog.KRef(cj.GetNamespace(), cj.GetName()), "schedule", cj.Spec.Schedule, "err", err) klog.V(2).InfoS("Unparseable schedule", "cronjob", klog.KRef(cronJob.GetNamespace(), cronJob.GetName()), "schedule", cronJob.Spec.Schedule, "err", err)
jm.recorder.Eventf(cj, corev1.EventTypeWarning, "UnparseableSchedule", "unparseable schedule: %q : %s", cj.Spec.Schedule, err) jm.recorder.Eventf(cronJob, corev1.EventTypeWarning, "UnparseableSchedule", "unparseable schedule: %q : %s", cronJob.Spec.Schedule, err)
return cj, nil, nil return cronJob, nil, updateStatus, nil
} }
if strings.Contains(cj.Spec.Schedule, "TZ") { if strings.Contains(cronJob.Spec.Schedule, "TZ") {
jm.recorder.Eventf(cj, corev1.EventTypeWarning, "UnsupportedSchedule", "CRON_TZ or TZ used in schedule %q is not officially supported, see https://kubernetes.io/docs/concepts/workloads/controllers/cron-jobs/ for more details", cj.Spec.Schedule) jm.recorder.Eventf(cronJob, corev1.EventTypeWarning, "UnsupportedSchedule", "CRON_TZ or TZ used in schedule %q is not officially supported, see https://kubernetes.io/docs/concepts/workloads/controllers/cron-jobs/ for more details", cronJob.Spec.Schedule)
} }
scheduledTime, err := getNextScheduleTime(*cj, now, sched, jm.recorder) scheduledTime, err := getNextScheduleTime(*cronJob, now, sched, jm.recorder)
if err != nil { if err != nil {
// this is likely a user error in defining the spec value // this is likely a user error in defining the spec value
// we should log the error and not reconcile this cronjob until an update to spec // we should log the error and not reconcile this cronjob until an update to spec
klog.V(2).InfoS("invalid schedule", "cronjob", klog.KRef(cj.GetNamespace(), cj.GetName()), "schedule", cj.Spec.Schedule, "err", err) klog.V(2).InfoS("invalid schedule", "cronjob", klog.KRef(cronJob.GetNamespace(), cronJob.GetName()), "schedule", cronJob.Spec.Schedule, "err", err)
jm.recorder.Eventf(cj, corev1.EventTypeWarning, "InvalidSchedule", "invalid schedule: %s : %s", cj.Spec.Schedule, err) jm.recorder.Eventf(cronJob, corev1.EventTypeWarning, "InvalidSchedule", "invalid schedule: %s : %s", cronJob.Spec.Schedule, err)
return cj, nil, nil return cronJob, nil, updateStatus, nil
} }
if scheduledTime == nil { if scheduledTime == nil {
// no unmet start time, return cj,. // no unmet start time, return cj,.
// The only time this should happen is if queue is filled after restart. // The only time this should happen is if queue is filled after restart.
// Otherwise, the queue is always suppose to trigger sync function at the time of // Otherwise, the queue is always suppose to trigger sync function at the time of
// the scheduled time, that will give atleast 1 unmet time schedule // the scheduled time, that will give atleast 1 unmet time schedule
klog.V(4).InfoS("No unmet start times", "cronjob", klog.KRef(cj.GetNamespace(), cj.GetName())) klog.V(4).InfoS("No unmet start times", "cronjob", klog.KRef(cronJob.GetNamespace(), cronJob.GetName()))
t := nextScheduledTimeDuration(sched, now) t := nextScheduledTimeDuration(sched, now)
return cj, t, nil return cronJob, t, updateStatus, nil
} }
tooLate := false tooLate := false
if cj.Spec.StartingDeadlineSeconds != nil { if cronJob.Spec.StartingDeadlineSeconds != nil {
tooLate = scheduledTime.Add(time.Second * time.Duration(*cj.Spec.StartingDeadlineSeconds)).Before(now) tooLate = scheduledTime.Add(time.Second * time.Duration(*cronJob.Spec.StartingDeadlineSeconds)).Before(now)
} }
if tooLate { if tooLate {
klog.V(4).InfoS("Missed starting window", "cronjob", klog.KRef(cj.GetNamespace(), cj.GetName())) klog.V(4).InfoS("Missed starting window", "cronjob", klog.KRef(cronJob.GetNamespace(), cronJob.GetName()))
jm.recorder.Eventf(cj, corev1.EventTypeWarning, "MissSchedule", "Missed scheduled time to start a job: %s", scheduledTime.UTC().Format(time.RFC1123Z)) jm.recorder.Eventf(cronJob, corev1.EventTypeWarning, "MissSchedule", "Missed scheduled time to start a job: %s", scheduledTime.UTC().Format(time.RFC1123Z))
// TODO: Since we don't set LastScheduleTime when not scheduling, we are going to keep noticing // TODO: Since we don't set LastScheduleTime when not scheduling, we are going to keep noticing
// the miss every cycle. In order to avoid sending multiple events, and to avoid processing // the miss every cycle. In order to avoid sending multiple events, and to avoid processing
@ -532,18 +543,18 @@ func (jm *ControllerV2) syncCronJob(
// and event the next time we process it, and also so the user looking at the status // and event the next time we process it, and also so the user looking at the status
// can see easily that there was a missed execution. // can see easily that there was a missed execution.
t := nextScheduledTimeDuration(sched, now) t := nextScheduledTimeDuration(sched, now)
return cj, t, nil return cronJob, t, updateStatus, nil
} }
if isJobInActiveList(&batchv1.Job{ if isJobInActiveList(&batchv1.Job{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: getJobName(cj, *scheduledTime), Name: getJobName(cronJob, *scheduledTime),
Namespace: cj.Namespace, Namespace: cronJob.Namespace,
}}, cj.Status.Active) || cj.Status.LastScheduleTime.Equal(&metav1.Time{Time: *scheduledTime}) { }}, cronJob.Status.Active) || cronJob.Status.LastScheduleTime.Equal(&metav1.Time{Time: *scheduledTime}) {
klog.V(4).InfoS("Not starting job because the scheduled time is already processed", "cronjob", klog.KRef(cj.GetNamespace(), cj.GetName()), "schedule", scheduledTime) klog.V(4).InfoS("Not starting job because the scheduled time is already processed", "cronjob", klog.KRef(cronJob.GetNamespace(), cronJob.GetName()), "schedule", scheduledTime)
t := nextScheduledTimeDuration(sched, now) t := nextScheduledTimeDuration(sched, now)
return cj, t, nil return cronJob, t, updateStatus, nil
} }
if cj.Spec.ConcurrencyPolicy == batchv1.ForbidConcurrent && len(cj.Status.Active) > 0 { if cronJob.Spec.ConcurrencyPolicy == batchv1.ForbidConcurrent && len(cronJob.Status.Active) > 0 {
// Regardless which source of information we use for the set of active jobs, // Regardless which source of information we use for the set of active jobs,
// there is some risk that we won't see an active job when there is one. // there is some risk that we won't see an active job when there is one.
// (because we haven't seen the status update to the SJ or the created pod). // (because we haven't seen the status update to the SJ or the created pod).
@ -553,47 +564,48 @@ func (jm *ControllerV2) syncCronJob(
// TODO: for Forbid, we could use the same name for every execution, as a lock. // TODO: for Forbid, we could use the same name for every execution, as a lock.
// With replace, we could use a name that is deterministic per execution time. // With replace, we could use a name that is deterministic per execution time.
// But that would mean that you could not inspect prior successes or failures of Forbid jobs. // But that would mean that you could not inspect prior successes or failures of Forbid jobs.
klog.V(4).InfoS("Not starting job because prior execution is still running and concurrency policy is Forbid", "cronjob", klog.KRef(cj.GetNamespace(), cj.GetName())) klog.V(4).InfoS("Not starting job because prior execution is still running and concurrency policy is Forbid", "cronjob", klog.KRef(cronJob.GetNamespace(), cronJob.GetName()))
jm.recorder.Eventf(cj, corev1.EventTypeNormal, "JobAlreadyActive", "Not starting job because prior execution is running and concurrency policy is Forbid") jm.recorder.Eventf(cronJob, corev1.EventTypeNormal, "JobAlreadyActive", "Not starting job because prior execution is running and concurrency policy is Forbid")
t := nextScheduledTimeDuration(sched, now) t := nextScheduledTimeDuration(sched, now)
return cj, t, nil return cronJob, t, updateStatus, nil
} }
if cj.Spec.ConcurrencyPolicy == batchv1.ReplaceConcurrent { if cronJob.Spec.ConcurrencyPolicy == batchv1.ReplaceConcurrent {
for _, j := range cj.Status.Active { for _, j := range cronJob.Status.Active {
klog.V(4).InfoS("Deleting job that was still running at next scheduled start time", "job", klog.KRef(j.Namespace, j.Name)) klog.V(4).InfoS("Deleting job that was still running at next scheduled start time", "job", klog.KRef(j.Namespace, j.Name))
job, err := jm.jobControl.GetJob(j.Namespace, j.Name) job, err := jm.jobControl.GetJob(j.Namespace, j.Name)
if err != nil { if err != nil {
jm.recorder.Eventf(cj, corev1.EventTypeWarning, "FailedGet", "Get job: %v", err) jm.recorder.Eventf(cronJob, corev1.EventTypeWarning, "FailedGet", "Get job: %v", err)
return cj, nil, err return cronJob, nil, updateStatus, err
} }
if !deleteJob(cj, job, jm.jobControl, jm.recorder) { if !deleteJob(cronJob, job, jm.jobControl, jm.recorder) {
return cj, nil, fmt.Errorf("could not replace job %s/%s", job.Namespace, job.Name) return cronJob, nil, updateStatus, fmt.Errorf("could not replace job %s/%s", job.Namespace, job.Name)
} }
updateStatus = true
} }
} }
jobReq, err := getJobFromTemplate2(cj, *scheduledTime) jobReq, err := getJobFromTemplate2(cronJob, *scheduledTime)
if err != nil { if err != nil {
klog.ErrorS(err, "Unable to make Job from template", "cronjob", klog.KRef(cj.GetNamespace(), cj.GetName())) klog.ErrorS(err, "Unable to make Job from template", "cronjob", klog.KRef(cronJob.GetNamespace(), cronJob.GetName()))
return cj, nil, err return cronJob, nil, updateStatus, err
} }
jobResp, err := jm.jobControl.CreateJob(cj.Namespace, jobReq) jobResp, err := jm.jobControl.CreateJob(cronJob.Namespace, jobReq)
switch { switch {
case errors.HasStatusCause(err, corev1.NamespaceTerminatingCause): case errors.HasStatusCause(err, corev1.NamespaceTerminatingCause):
case errors.IsAlreadyExists(err): case errors.IsAlreadyExists(err):
// If the job is created by other actor, assume it has updated the cronjob status accordingly // If the job is created by other actor, assume it has updated the cronjob status accordingly
klog.InfoS("Job already exists", "cronjob", klog.KRef(cj.GetNamespace(), cj.GetName()), "job", klog.KRef(jobReq.GetNamespace(), jobReq.GetName())) klog.InfoS("Job already exists", "cronjob", klog.KRef(cronJob.GetNamespace(), cronJob.GetName()), "job", klog.KRef(jobReq.GetNamespace(), jobReq.GetName()))
return cj, nil, err return cronJob, nil, updateStatus, err
case err != nil: case err != nil:
// default error handling // default error handling
jm.recorder.Eventf(cj, corev1.EventTypeWarning, "FailedCreate", "Error creating job: %v", err) jm.recorder.Eventf(cronJob, corev1.EventTypeWarning, "FailedCreate", "Error creating job: %v", err)
return cj, nil, err return cronJob, nil, updateStatus, err
} }
metrics.CronJobCreationSkew.Observe(jobResp.ObjectMeta.GetCreationTimestamp().Sub(*scheduledTime).Seconds()) metrics.CronJobCreationSkew.Observe(jobResp.ObjectMeta.GetCreationTimestamp().Sub(*scheduledTime).Seconds())
klog.V(4).InfoS("Created Job", "job", klog.KRef(jobResp.GetNamespace(), jobResp.GetName()), "cronjob", klog.KRef(cj.GetNamespace(), cj.GetName())) klog.V(4).InfoS("Created Job", "job", klog.KRef(jobResp.GetNamespace(), jobResp.GetName()), "cronjob", klog.KRef(cronJob.GetNamespace(), cronJob.GetName()))
jm.recorder.Eventf(cj, corev1.EventTypeNormal, "SuccessfulCreate", "Created job %v", jobResp.Name) jm.recorder.Eventf(cronJob, corev1.EventTypeNormal, "SuccessfulCreate", "Created job %v", jobResp.Name)
// ------------------------------------------------------------------ // // ------------------------------------------------------------------ //
@ -608,19 +620,15 @@ func (jm *ControllerV2) syncCronJob(
// Add the just-started job to the status list. // Add the just-started job to the status list.
jobRef, err := getRef(jobResp) jobRef, err := getRef(jobResp)
if err != nil { if err != nil {
klog.V(2).InfoS("Unable to make object reference", "cronjob", klog.KRef(cj.GetNamespace(), cj.GetName()), "err", err) klog.V(2).InfoS("Unable to make object reference", "cronjob", klog.KRef(cronJob.GetNamespace(), cronJob.GetName()), "err", err)
return cj, nil, fmt.Errorf("unable to make object reference for job for %s", klog.KRef(cj.GetNamespace(), cj.GetName())) return cronJob, nil, updateStatus, fmt.Errorf("unable to make object reference for job for %s", klog.KRef(cronJob.GetNamespace(), cronJob.GetName()))
}
cj.Status.Active = append(cj.Status.Active, *jobRef)
cj.Status.LastScheduleTime = &metav1.Time{Time: *scheduledTime}
updatedCJ, err = jm.cronJobControl.UpdateStatus(ctx, cj)
if err != nil {
klog.InfoS("Unable to update status", "cronjob", klog.KRef(cj.GetNamespace(), cj.GetName()), "resourceVersion", cj.ResourceVersion, "err", err)
return cj, nil, fmt.Errorf("unable to update status for %s (rv = %s): %v", klog.KRef(cj.GetNamespace(), cj.GetName()), cj.ResourceVersion, err)
} }
cronJob.Status.Active = append(cronJob.Status.Active, *jobRef)
cronJob.Status.LastScheduleTime = &metav1.Time{Time: *scheduledTime}
updateStatus = true
t := nextScheduledTimeDuration(sched, now) t := nextScheduledTimeDuration(sched, now)
return updatedCJ, t, nil return cronJob, t, updateStatus, nil
} }
func getJobName(cj *batchv1.CronJob, scheduledTime time.Time) string { func getJobName(cj *batchv1.CronJob, scheduledTime time.Time) string {
@ -638,12 +646,14 @@ func nextScheduledTimeDuration(sched cron.Schedule, now time.Time) *time.Duratio
} }
// cleanupFinishedJobs cleanups finished jobs created by a CronJob // cleanupFinishedJobs cleanups finished jobs created by a CronJob
func (jm *ControllerV2) cleanupFinishedJobs(ctx context.Context, cj *batchv1.CronJob, js []*batchv1.Job) error { // It returns a bool to indicate an update to api-server is needed
func (jm *ControllerV2) cleanupFinishedJobs(ctx context.Context, cj *batchv1.CronJob, js []*batchv1.Job) bool {
// If neither limits are active, there is no need to do anything. // If neither limits are active, there is no need to do anything.
if cj.Spec.FailedJobsHistoryLimit == nil && cj.Spec.SuccessfulJobsHistoryLimit == nil { if cj.Spec.FailedJobsHistoryLimit == nil && cj.Spec.SuccessfulJobsHistoryLimit == nil {
return nil return false
} }
updateStatus := false
failedJobs := []*batchv1.Job{} failedJobs := []*batchv1.Job{}
successfulJobs := []*batchv1.Job{} successfulJobs := []*batchv1.Job{}
@ -656,21 +666,21 @@ func (jm *ControllerV2) cleanupFinishedJobs(ctx context.Context, cj *batchv1.Cro
} }
} }
if cj.Spec.SuccessfulJobsHistoryLimit != nil { if cj.Spec.SuccessfulJobsHistoryLimit != nil &&
jm.removeOldestJobs(cj, jm.removeOldestJobs(cj,
successfulJobs, successfulJobs,
*cj.Spec.SuccessfulJobsHistoryLimit) *cj.Spec.SuccessfulJobsHistoryLimit) {
updateStatus = true
} }
if cj.Spec.FailedJobsHistoryLimit != nil { if cj.Spec.FailedJobsHistoryLimit != nil &&
jm.removeOldestJobs(cj, jm.removeOldestJobs(cj,
failedJobs, failedJobs,
*cj.Spec.FailedJobsHistoryLimit) *cj.Spec.FailedJobsHistoryLimit) {
updateStatus = true
} }
// Update the CronJob, in case jobs were removed from the list. return updateStatus
_, err := jm.cronJobControl.UpdateStatus(ctx, cj)
return err
} }
func (jm *ControllerV2) getFinishedStatus(j *batchv1.Job) (bool, batchv1.JobConditionType) { func (jm *ControllerV2) getFinishedStatus(j *batchv1.Job) (bool, batchv1.JobConditionType) {
@ -683,10 +693,11 @@ func (jm *ControllerV2) getFinishedStatus(j *batchv1.Job) (bool, batchv1.JobCond
} }
// removeOldestJobs removes the oldest jobs from a list of jobs // removeOldestJobs removes the oldest jobs from a list of jobs
func (jm *ControllerV2) removeOldestJobs(cj *batchv1.CronJob, js []*batchv1.Job, maxJobs int32) { func (jm *ControllerV2) removeOldestJobs(cj *batchv1.CronJob, js []*batchv1.Job, maxJobs int32) bool {
updateStatus := false
numToDelete := len(js) - int(maxJobs) numToDelete := len(js) - int(maxJobs)
if numToDelete <= 0 { if numToDelete <= 0 {
return return updateStatus
} }
klog.V(4).InfoS("Cleaning up jobs from CronJob list", "deletejobnum", numToDelete, "jobnum", len(js), "cronjob", klog.KRef(cj.GetNamespace(), cj.GetName())) klog.V(4).InfoS("Cleaning up jobs from CronJob list", "deletejobnum", numToDelete, "jobnum", len(js), "cronjob", klog.KRef(cj.GetNamespace(), cj.GetName()))
@ -694,9 +705,12 @@ func (jm *ControllerV2) removeOldestJobs(cj *batchv1.CronJob, js []*batchv1.Job,
sort.Sort(byJobStartTimeStar(js)) sort.Sort(byJobStartTimeStar(js))
for i := 0; i < numToDelete; i++ { for i := 0; i < numToDelete; i++ {
klog.V(4).InfoS("Removing job from CronJob list", "job", js[i].Name, "cronjob", klog.KRef(cj.GetNamespace(), cj.GetName())) klog.V(4).InfoS("Removing job from CronJob list", "job", js[i].Name, "cronjob", klog.KRef(cj.GetNamespace(), cj.GetName()))
deleteJob(cj, js[i], jm.jobControl, jm.recorder) if deleteJob(cj, js[i], jm.jobControl, jm.recorder) {
updateStatus = true
} }
} }
return updateStatus
}
// isJobInActiveList take a job and checks if activeJobs has a job with the same // isJobInActiveList take a job and checks if activeJobs has a job with the same
// name and namespace. // name and namespace.

View File

@ -167,12 +167,12 @@ func TestControllerV2SyncCronJob(t *testing.T) {
// cj status // cj status
ranPreviously bool ranPreviously bool
stillActive bool stillActive bool
jobPresentInCJActiveStatus bool
jobCreationTime time.Time
// environment // environment
jobCreationTime time.Time
now time.Time now time.Time
jobCreateError error
jobGetErr error
// expectations // expectations
expectCreate bool expectCreate bool
@ -181,8 +181,9 @@ func TestControllerV2SyncCronJob(t *testing.T) {
expectedWarnings int expectedWarnings int
expectErr bool expectErr bool
expectRequeueAfter bool expectRequeueAfter bool
expectUpdateStatus bool
jobStillNotFoundInLister bool jobStillNotFoundInLister bool
jobCreateError error jobPresentInCJActiveStatus bool
}{ }{
"never ran, not valid schedule, A": { "never ran, not valid schedule, A": {
concurrencyPolicy: "Allow", concurrencyPolicy: "Allow",
@ -246,6 +247,7 @@ func TestControllerV2SyncCronJob(t *testing.T) {
expectCreate: true, expectCreate: true,
expectActive: 1, expectActive: 1,
expectRequeueAfter: true, expectRequeueAfter: true,
expectUpdateStatus: true,
jobPresentInCJActiveStatus: true, jobPresentInCJActiveStatus: true,
}, },
"never ran, is time, F": { "never ran, is time, F": {
@ -257,6 +259,7 @@ func TestControllerV2SyncCronJob(t *testing.T) {
expectCreate: true, expectCreate: true,
expectActive: 1, expectActive: 1,
expectRequeueAfter: true, expectRequeueAfter: true,
expectUpdateStatus: true,
jobPresentInCJActiveStatus: true, jobPresentInCJActiveStatus: true,
}, },
"never ran, is time, R": { "never ran, is time, R": {
@ -268,6 +271,7 @@ func TestControllerV2SyncCronJob(t *testing.T) {
expectCreate: true, expectCreate: true,
expectActive: 1, expectActive: 1,
expectRequeueAfter: true, expectRequeueAfter: true,
expectUpdateStatus: true,
jobPresentInCJActiveStatus: true, jobPresentInCJActiveStatus: true,
}, },
"never ran, is time, suspended": { "never ran, is time, suspended": {
@ -297,6 +301,7 @@ func TestControllerV2SyncCronJob(t *testing.T) {
expectCreate: true, expectCreate: true,
expectActive: 1, expectActive: 1,
expectRequeueAfter: true, expectRequeueAfter: true,
expectUpdateStatus: true,
jobPresentInCJActiveStatus: true, jobPresentInCJActiveStatus: true,
}, },
@ -308,6 +313,7 @@ func TestControllerV2SyncCronJob(t *testing.T) {
jobCreationTime: justAfterThePriorHour(), jobCreationTime: justAfterThePriorHour(),
now: justBeforeTheHour(), now: justBeforeTheHour(),
expectRequeueAfter: true, expectRequeueAfter: true,
expectUpdateStatus: true,
jobPresentInCJActiveStatus: true, jobPresentInCJActiveStatus: true,
}, },
"prev ran but done, not time, F": { "prev ran but done, not time, F": {
@ -318,6 +324,7 @@ func TestControllerV2SyncCronJob(t *testing.T) {
jobCreationTime: justAfterThePriorHour(), jobCreationTime: justAfterThePriorHour(),
now: justBeforeTheHour(), now: justBeforeTheHour(),
expectRequeueAfter: true, expectRequeueAfter: true,
expectUpdateStatus: true,
jobPresentInCJActiveStatus: true, jobPresentInCJActiveStatus: true,
}, },
"prev ran but done, not time, R": { "prev ran but done, not time, R": {
@ -328,6 +335,7 @@ func TestControllerV2SyncCronJob(t *testing.T) {
jobCreationTime: justAfterThePriorHour(), jobCreationTime: justAfterThePriorHour(),
now: justBeforeTheHour(), now: justBeforeTheHour(),
expectRequeueAfter: true, expectRequeueAfter: true,
expectUpdateStatus: true,
jobPresentInCJActiveStatus: true, jobPresentInCJActiveStatus: true,
}, },
"prev ran but done, is time, A": { "prev ran but done, is time, A": {
@ -340,6 +348,19 @@ func TestControllerV2SyncCronJob(t *testing.T) {
expectCreate: true, expectCreate: true,
expectActive: 1, expectActive: 1,
expectRequeueAfter: true, expectRequeueAfter: true,
expectUpdateStatus: true,
jobPresentInCJActiveStatus: true,
},
"prev ran but done, is time, create job failed, A": {
concurrencyPolicy: "Allow",
schedule: onTheHour,
deadline: noDead,
ranPreviously: true,
jobCreationTime: justAfterThePriorHour(),
now: *justAfterTheHour(),
jobCreateError: errors.NewAlreadyExists(schema.GroupResource{Resource: "job", Group: "batch"}, ""),
expectErr: true,
expectUpdateStatus: true,
jobPresentInCJActiveStatus: true, jobPresentInCJActiveStatus: true,
}, },
"prev ran but done, is time, F": { "prev ran but done, is time, F": {
@ -352,6 +373,7 @@ func TestControllerV2SyncCronJob(t *testing.T) {
expectCreate: true, expectCreate: true,
expectActive: 1, expectActive: 1,
expectRequeueAfter: true, expectRequeueAfter: true,
expectUpdateStatus: true,
jobPresentInCJActiveStatus: true, jobPresentInCJActiveStatus: true,
}, },
"prev ran but done, is time, R": { "prev ran but done, is time, R": {
@ -364,6 +386,7 @@ func TestControllerV2SyncCronJob(t *testing.T) {
expectCreate: true, expectCreate: true,
expectActive: 1, expectActive: 1,
expectRequeueAfter: true, expectRequeueAfter: true,
expectUpdateStatus: true,
jobPresentInCJActiveStatus: true, jobPresentInCJActiveStatus: true,
}, },
"prev ran but done, is time, suspended": { "prev ran but done, is time, suspended": {
@ -374,6 +397,7 @@ func TestControllerV2SyncCronJob(t *testing.T) {
ranPreviously: true, ranPreviously: true,
jobCreationTime: justAfterThePriorHour(), jobCreationTime: justAfterThePriorHour(),
now: *justAfterTheHour(), now: *justAfterTheHour(),
expectUpdateStatus: true,
jobPresentInCJActiveStatus: true, jobPresentInCJActiveStatus: true,
}, },
"prev ran but done, is time, past deadline": { "prev ran but done, is time, past deadline": {
@ -384,6 +408,7 @@ func TestControllerV2SyncCronJob(t *testing.T) {
jobCreationTime: justAfterThePriorHour(), jobCreationTime: justAfterThePriorHour(),
now: *justAfterTheHour(), now: *justAfterTheHour(),
expectRequeueAfter: true, expectRequeueAfter: true,
expectUpdateStatus: true,
jobPresentInCJActiveStatus: true, jobPresentInCJActiveStatus: true,
}, },
"prev ran but done, is time, not past deadline": { "prev ran but done, is time, not past deadline": {
@ -396,6 +421,7 @@ func TestControllerV2SyncCronJob(t *testing.T) {
expectCreate: true, expectCreate: true,
expectActive: 1, expectActive: 1,
expectRequeueAfter: true, expectRequeueAfter: true,
expectUpdateStatus: true,
jobPresentInCJActiveStatus: true, jobPresentInCJActiveStatus: true,
}, },
@ -446,6 +472,7 @@ func TestControllerV2SyncCronJob(t *testing.T) {
expectCreate: true, expectCreate: true,
expectActive: 2, expectActive: 2,
expectRequeueAfter: true, expectRequeueAfter: true,
expectUpdateStatus: true,
jobPresentInCJActiveStatus: true, jobPresentInCJActiveStatus: true,
}, },
"still active, is time, F": { "still active, is time, F": {
@ -472,6 +499,20 @@ func TestControllerV2SyncCronJob(t *testing.T) {
expectDelete: true, expectDelete: true,
expectActive: 1, expectActive: 1,
expectRequeueAfter: true, expectRequeueAfter: true,
expectUpdateStatus: true,
jobPresentInCJActiveStatus: true,
},
"still active, is time, get job failed, R": {
concurrencyPolicy: "Replace",
schedule: onTheHour,
deadline: noDead,
ranPreviously: true,
stillActive: true,
jobCreationTime: justAfterThePriorHour(),
now: *justAfterTheHour(),
jobGetErr: errors.NewBadRequest("request is invalid"),
expectActive: 1,
expectedWarnings: 1,
jobPresentInCJActiveStatus: true, jobPresentInCJActiveStatus: true,
}, },
"still active, is time, suspended": { "still active, is time, suspended": {
@ -509,6 +550,7 @@ func TestControllerV2SyncCronJob(t *testing.T) {
expectCreate: true, expectCreate: true,
expectActive: 2, expectActive: 2,
expectRequeueAfter: true, expectRequeueAfter: true,
expectUpdateStatus: true,
jobPresentInCJActiveStatus: true, jobPresentInCJActiveStatus: true,
}, },
@ -525,6 +567,7 @@ func TestControllerV2SyncCronJob(t *testing.T) {
expectActive: 1, expectActive: 1,
expectedWarnings: 1, expectedWarnings: 1,
expectRequeueAfter: true, expectRequeueAfter: true,
expectUpdateStatus: true,
jobPresentInCJActiveStatus: true, jobPresentInCJActiveStatus: true,
}, },
"prev ran but done, long overdue, not past deadline, R": { "prev ran but done, long overdue, not past deadline, R": {
@ -538,6 +581,7 @@ func TestControllerV2SyncCronJob(t *testing.T) {
expectActive: 1, expectActive: 1,
expectedWarnings: 1, expectedWarnings: 1,
expectRequeueAfter: true, expectRequeueAfter: true,
expectUpdateStatus: true,
jobPresentInCJActiveStatus: true, jobPresentInCJActiveStatus: true,
}, },
"prev ran but done, long overdue, not past deadline, F": { "prev ran but done, long overdue, not past deadline, F": {
@ -551,6 +595,7 @@ func TestControllerV2SyncCronJob(t *testing.T) {
expectActive: 1, expectActive: 1,
expectedWarnings: 1, expectedWarnings: 1,
expectRequeueAfter: true, expectRequeueAfter: true,
expectUpdateStatus: true,
jobPresentInCJActiveStatus: true, jobPresentInCJActiveStatus: true,
}, },
"prev ran but done, long overdue, no deadline, A": { "prev ran but done, long overdue, no deadline, A": {
@ -564,6 +609,7 @@ func TestControllerV2SyncCronJob(t *testing.T) {
expectActive: 1, expectActive: 1,
expectedWarnings: 1, expectedWarnings: 1,
expectRequeueAfter: true, expectRequeueAfter: true,
expectUpdateStatus: true,
jobPresentInCJActiveStatus: true, jobPresentInCJActiveStatus: true,
}, },
"prev ran but done, long overdue, no deadline, R": { "prev ran but done, long overdue, no deadline, R": {
@ -577,6 +623,7 @@ func TestControllerV2SyncCronJob(t *testing.T) {
expectActive: 1, expectActive: 1,
expectedWarnings: 1, expectedWarnings: 1,
expectRequeueAfter: true, expectRequeueAfter: true,
expectUpdateStatus: true,
jobPresentInCJActiveStatus: true, jobPresentInCJActiveStatus: true,
}, },
"prev ran but done, long overdue, no deadline, F": { "prev ran but done, long overdue, no deadline, F": {
@ -590,6 +637,7 @@ func TestControllerV2SyncCronJob(t *testing.T) {
expectActive: 1, expectActive: 1,
expectedWarnings: 1, expectedWarnings: 1,
expectRequeueAfter: true, expectRequeueAfter: true,
expectUpdateStatus: true,
jobPresentInCJActiveStatus: true, jobPresentInCJActiveStatus: true,
}, },
@ -603,6 +651,7 @@ func TestControllerV2SyncCronJob(t *testing.T) {
expectCreate: true, expectCreate: true,
expectActive: 1, expectActive: 1,
expectRequeueAfter: true, expectRequeueAfter: true,
expectUpdateStatus: true,
jobPresentInCJActiveStatus: true, jobPresentInCJActiveStatus: true,
}, },
"prev ran but done, long overdue, past short deadline, A": { "prev ran but done, long overdue, past short deadline, A": {
@ -615,6 +664,7 @@ func TestControllerV2SyncCronJob(t *testing.T) {
expectCreate: true, expectCreate: true,
expectActive: 1, expectActive: 1,
expectRequeueAfter: true, expectRequeueAfter: true,
expectUpdateStatus: true,
jobPresentInCJActiveStatus: true, jobPresentInCJActiveStatus: true,
}, },
@ -628,6 +678,7 @@ func TestControllerV2SyncCronJob(t *testing.T) {
expectCreate: true, expectCreate: true,
expectActive: 1, expectActive: 1,
expectRequeueAfter: true, expectRequeueAfter: true,
expectUpdateStatus: true,
jobPresentInCJActiveStatus: true, jobPresentInCJActiveStatus: true,
}, },
"prev ran but done, long overdue, past short deadline, R": { "prev ran but done, long overdue, past short deadline, R": {
@ -640,6 +691,7 @@ func TestControllerV2SyncCronJob(t *testing.T) {
expectCreate: true, expectCreate: true,
expectActive: 1, expectActive: 1,
expectRequeueAfter: true, expectRequeueAfter: true,
expectUpdateStatus: true,
jobPresentInCJActiveStatus: true, jobPresentInCJActiveStatus: true,
}, },
@ -653,6 +705,7 @@ func TestControllerV2SyncCronJob(t *testing.T) {
expectCreate: true, expectCreate: true,
expectActive: 1, expectActive: 1,
expectRequeueAfter: true, expectRequeueAfter: true,
expectUpdateStatus: true,
jobPresentInCJActiveStatus: true, jobPresentInCJActiveStatus: true,
}, },
"prev ran but done, long overdue, past short deadline, F": { "prev ran but done, long overdue, past short deadline, F": {
@ -665,6 +718,7 @@ func TestControllerV2SyncCronJob(t *testing.T) {
expectCreate: true, expectCreate: true,
expectActive: 1, expectActive: 1,
expectRequeueAfter: true, expectRequeueAfter: true,
expectUpdateStatus: true,
jobPresentInCJActiveStatus: true, jobPresentInCJActiveStatus: true,
}, },
@ -677,8 +731,10 @@ func TestControllerV2SyncCronJob(t *testing.T) {
ranPreviously: true, ranPreviously: true,
jobCreationTime: *justAfterTheHour(), jobCreationTime: *justAfterTheHour(),
now: justBeforeTheHour(), now: justBeforeTheHour(),
jobCreateError: errors.NewAlreadyExists(schema.GroupResource{Resource: "jobs", Group: "batch"}, ""),
expectRequeueAfter: true, expectRequeueAfter: true,
jobCreateError: errors.NewAlreadyExists(schema.GroupResource{Resource: "jobs", Group: "batch"}, "")}, expectUpdateStatus: true,
},
// Tests for slow job lister // Tests for slow job lister
"this started but went missing, not past deadline, A": { "this started but went missing, not past deadline, A": {
@ -812,7 +868,7 @@ func TestControllerV2SyncCronJob(t *testing.T) {
} }
} }
jc := &fakeJobControl{Job: job, CreateErr: tc.jobCreateError} jc := &fakeJobControl{Job: job, CreateErr: tc.jobCreateError, Err: tc.jobGetErr}
cjc := &fakeCJControl{CronJob: realCJ} cjc := &fakeCJControl{CronJob: realCJ}
recorder := record.NewFakeRecorder(10) recorder := record.NewFakeRecorder(10)
@ -824,7 +880,7 @@ func TestControllerV2SyncCronJob(t *testing.T) {
return tc.now return tc.now
}, },
} }
cjCopy, requeueAfter, err := jm.syncCronJob(context.TODO(), &cj, js) cjCopy, requeueAfter, updateStatus, err := jm.syncCronJob(context.TODO(), &cj, js)
if tc.expectErr && err == nil { if tc.expectErr && err == nil {
t.Errorf("%s: expected error got none with requeueAfter time: %#v", name, requeueAfter) t.Errorf("%s: expected error got none with requeueAfter time: %#v", name, requeueAfter)
} }
@ -838,6 +894,9 @@ func TestControllerV2SyncCronJob(t *testing.T) {
t.Errorf("%s: expected requeueAfter: %+v, got requeueAfter time: %+v", name, expectedRequeueAfter, requeueAfter) t.Errorf("%s: expected requeueAfter: %+v, got requeueAfter time: %+v", name, expectedRequeueAfter, requeueAfter)
} }
} }
if updateStatus != tc.expectUpdateStatus {
t.Errorf("%s: expected updateStatus: %t, actually: %t", name, tc.expectUpdateStatus, updateStatus)
}
expectedCreates := 0 expectedCreates := 0
if tc.expectCreate { if tc.expectCreate {
expectedCreates = 1 expectedCreates = 1