Merge pull request #90934 from iobuf/cronjob

CronJob: cleanup legacy ScheduledJob vars,docs
This commit is contained in:
Kubernetes Prow Robot 2020-05-19 20:35:11 -07:00 committed by GitHub
commit fdbb960354
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 208 additions and 209 deletions

View File

@ -173,16 +173,16 @@ func ValidateJobStatusUpdate(status, oldStatus batch.JobStatus) field.ErrorList
} }
// ValidateCronJob validates a CronJob and returns an ErrorList with any errors. // ValidateCronJob validates a CronJob and returns an ErrorList with any errors.
func ValidateCronJob(scheduledJob *batch.CronJob) field.ErrorList { func ValidateCronJob(cronJob *batch.CronJob) field.ErrorList {
// CronJobs and rcs have the same name validation // CronJobs and rcs have the same name validation
allErrs := apivalidation.ValidateObjectMeta(&scheduledJob.ObjectMeta, true, apivalidation.ValidateReplicationControllerName, field.NewPath("metadata")) allErrs := apivalidation.ValidateObjectMeta(&cronJob.ObjectMeta, true, apivalidation.ValidateReplicationControllerName, field.NewPath("metadata"))
allErrs = append(allErrs, ValidateCronJobSpec(&scheduledJob.Spec, field.NewPath("spec"))...) allErrs = append(allErrs, ValidateCronJobSpec(&cronJob.Spec, field.NewPath("spec"))...)
if len(scheduledJob.ObjectMeta.Name) > apimachineryvalidation.DNS1035LabelMaxLength-11 { if len(cronJob.ObjectMeta.Name) > apimachineryvalidation.DNS1035LabelMaxLength-11 {
// The cronjob controller appends a 11-character suffix to the cronjob (`-$TIMESTAMP`) when // The cronjob controller appends a 11-character suffix to the cronjob (`-$TIMESTAMP`) when
// creating a job. The job name length limit is 63 characters. // creating a job. The job name length limit is 63 characters.
// Therefore cronjob names must have length <= 63-11=52. If we don't validate this here, // Therefore cronjob names must have length <= 63-11=52. If we don't validate this here,
// then job creation will fail later. // then job creation will fail later.
allErrs = append(allErrs, field.Invalid(field.NewPath("metadata").Child("name"), scheduledJob.ObjectMeta.Name, "must be no more than 52 characters")) allErrs = append(allErrs, field.Invalid(field.NewPath("metadata").Child("name"), cronJob.ObjectMeta.Name, "must be no more than 52 characters"))
} }
return allErrs return allErrs
} }

View File

@ -18,14 +18,13 @@ package cronjob
/* /*
I did not use watch or expectations. Those add a lot of corner cases, and we aren't I did not use watch or expectations. Those add a lot of corner cases, and we aren't
expecting a large volume of jobs or scheduledJobs. (We are favoring correctness expecting a large volume of jobs or cronJobs. (We are favoring correctness
over scalability. If we find a single controller thread is too slow because over scalability. If we find a single controller thread is too slow because
there are a lot of Jobs or CronJobs, we can parallelize by Namespace. there are a lot of Jobs or CronJobs, we can parallelize by Namespace.
If we find the load on the API server is too high, we can use a watch and If we find the load on the API server is too high, we can use a watch and
UndeltaStore.) UndeltaStore.)
Just periodically list jobs and SJs, and then reconcile them. Just periodically list jobs and cronJobs, and then reconcile them.
*/ */
import ( import (
@ -63,7 +62,7 @@ var controllerKind = batchv1beta1.SchemeGroupVersion.WithKind("CronJob")
type Controller struct { type Controller struct {
kubeClient clientset.Interface kubeClient clientset.Interface
jobControl jobControlInterface jobControl jobControlInterface
sjControl sjControlInterface cjControl cjControlInterface
podControl podControlInterface podControl podControlInterface
recorder record.EventRecorder recorder record.EventRecorder
} }
@ -83,7 +82,7 @@ func NewController(kubeClient clientset.Interface) (*Controller, error) {
jm := &Controller{ jm := &Controller{
kubeClient: kubeClient, kubeClient: kubeClient,
jobControl: realJobControl{KubeClient: kubeClient}, jobControl: realJobControl{KubeClient: kubeClient},
sjControl: &realSJControl{KubeClient: kubeClient}, cjControl: &realCJControl{KubeClient: kubeClient},
podControl: &realPodControl{KubeClient: kubeClient}, podControl: &realPodControl{KubeClient: kubeClient},
recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cronjob-controller"}), recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cronjob-controller"}),
} }
@ -131,15 +130,15 @@ func (jm *Controller) syncAll() {
return jm.kubeClient.BatchV1beta1().CronJobs(metav1.NamespaceAll).List(context.TODO(), opts) return jm.kubeClient.BatchV1beta1().CronJobs(metav1.NamespaceAll).List(context.TODO(), opts)
} }
jobsBySj := groupJobsByParent(js) jobsByCj := groupJobsByParent(js)
klog.V(4).Infof("Found %d groups", len(jobsBySj)) klog.V(4).Infof("Found %d groups", len(jobsByCj))
err = pager.New(pager.SimplePageFunc(cronJobListFunc)).EachListItem(context.Background(), metav1.ListOptions{}, func(object runtime.Object) error { err = pager.New(pager.SimplePageFunc(cronJobListFunc)).EachListItem(context.Background(), metav1.ListOptions{}, func(object runtime.Object) error {
sj, ok := object.(*batchv1beta1.CronJob) cj, ok := object.(*batchv1beta1.CronJob)
if !ok { if !ok {
return fmt.Errorf("expected type *batchv1beta1.CronJob, got type %T", sj) return fmt.Errorf("expected type *batchv1beta1.CronJob, got type %T", cj)
} }
syncOne(sj, jobsBySj[sj.UID], time.Now(), jm.jobControl, jm.sjControl, jm.recorder) syncOne(cj, jobsByCj[cj.UID], time.Now(), jm.jobControl, jm.cjControl, jm.recorder)
cleanupFinishedJobs(sj, jobsBySj[sj.UID], jm.jobControl, jm.sjControl, jm.recorder) cleanupFinishedJobs(cj, jobsByCj[cj.UID], jm.jobControl, jm.cjControl, jm.recorder)
return nil return nil
}) })
@ -150,10 +149,10 @@ func (jm *Controller) syncAll() {
} }
// cleanupFinishedJobs cleanups finished jobs created by a CronJob // cleanupFinishedJobs cleanups finished jobs created by a CronJob
func cleanupFinishedJobs(sj *batchv1beta1.CronJob, js []batchv1.Job, jc jobControlInterface, func cleanupFinishedJobs(cj *batchv1beta1.CronJob, js []batchv1.Job, jc jobControlInterface,
sjc sjControlInterface, recorder record.EventRecorder) { cjc cjControlInterface, recorder record.EventRecorder) {
// If neither limits are active, there is no need to do anything. // If neither limits are active, there is no need to do anything.
if sj.Spec.FailedJobsHistoryLimit == nil && sj.Spec.SuccessfulJobsHistoryLimit == nil { if cj.Spec.FailedJobsHistoryLimit == nil && cj.Spec.SuccessfulJobsHistoryLimit == nil {
return return
} }
@ -169,107 +168,107 @@ func cleanupFinishedJobs(sj *batchv1beta1.CronJob, js []batchv1.Job, jc jobContr
} }
} }
if sj.Spec.SuccessfulJobsHistoryLimit != nil { if cj.Spec.SuccessfulJobsHistoryLimit != nil {
removeOldestJobs(sj, removeOldestJobs(cj,
successfulJobs, successfulJobs,
jc, jc,
*sj.Spec.SuccessfulJobsHistoryLimit, *cj.Spec.SuccessfulJobsHistoryLimit,
recorder) recorder)
} }
if sj.Spec.FailedJobsHistoryLimit != nil { if cj.Spec.FailedJobsHistoryLimit != nil {
removeOldestJobs(sj, removeOldestJobs(cj,
failedJobs, failedJobs,
jc, jc,
*sj.Spec.FailedJobsHistoryLimit, *cj.Spec.FailedJobsHistoryLimit,
recorder) recorder)
} }
// Update the CronJob, in case jobs were removed from the list. // Update the CronJob, in case jobs were removed from the list.
if _, err := sjc.UpdateStatus(sj); err != nil { if _, err := cjc.UpdateStatus(cj); err != nil {
nameForLog := fmt.Sprintf("%s/%s", sj.Namespace, sj.Name) nameForLog := fmt.Sprintf("%s/%s", cj.Namespace, cj.Name)
klog.Infof("Unable to update status for %s (rv = %s): %v", nameForLog, sj.ResourceVersion, err) klog.Infof("Unable to update status for %s (rv = %s): %v", nameForLog, cj.ResourceVersion, err)
} }
} }
// removeOldestJobs removes the oldest jobs from a list of jobs // removeOldestJobs removes the oldest jobs from a list of jobs
func removeOldestJobs(sj *batchv1beta1.CronJob, js []batchv1.Job, jc jobControlInterface, maxJobs int32, recorder record.EventRecorder) { func removeOldestJobs(cj *batchv1beta1.CronJob, js []batchv1.Job, jc jobControlInterface, maxJobs int32, recorder record.EventRecorder) {
numToDelete := len(js) - int(maxJobs) numToDelete := len(js) - int(maxJobs)
if numToDelete <= 0 { if numToDelete <= 0 {
return return
} }
nameForLog := fmt.Sprintf("%s/%s", sj.Namespace, sj.Name) nameForLog := fmt.Sprintf("%s/%s", cj.Namespace, cj.Name)
klog.V(4).Infof("Cleaning up %d/%d jobs from %s", numToDelete, len(js), nameForLog) klog.V(4).Infof("Cleaning up %d/%d jobs from %s", numToDelete, len(js), nameForLog)
sort.Sort(byJobStartTime(js)) sort.Sort(byJobStartTime(js))
for i := 0; i < numToDelete; i++ { for i := 0; i < numToDelete; i++ {
klog.V(4).Infof("Removing job %s from %s", js[i].Name, nameForLog) klog.V(4).Infof("Removing job %s from %s", js[i].Name, nameForLog)
deleteJob(sj, &js[i], jc, recorder) deleteJob(cj, &js[i], jc, recorder)
} }
} }
// syncOne reconciles a CronJob with a list of any Jobs that it created. // syncOne reconciles a CronJob with a list of any Jobs that it created.
// All known jobs created by "sj" should be included in "js". // All known jobs created by "cj" should be included in "js".
// The current time is passed in to facilitate testing. // The current time is passed in to facilitate testing.
// It has no receiver, to facilitate testing. // It has no receiver, to facilitate testing.
func syncOne(sj *batchv1beta1.CronJob, js []batchv1.Job, now time.Time, jc jobControlInterface, sjc sjControlInterface, recorder record.EventRecorder) { func syncOne(cj *batchv1beta1.CronJob, js []batchv1.Job, now time.Time, jc jobControlInterface, cjc cjControlInterface, recorder record.EventRecorder) {
nameForLog := fmt.Sprintf("%s/%s", sj.Namespace, sj.Name) nameForLog := fmt.Sprintf("%s/%s", cj.Namespace, cj.Name)
childrenJobs := make(map[types.UID]bool) childrenJobs := make(map[types.UID]bool)
for _, j := range js { for _, j := range js {
childrenJobs[j.ObjectMeta.UID] = true childrenJobs[j.ObjectMeta.UID] = true
found := inActiveList(*sj, j.ObjectMeta.UID) found := inActiveList(*cj, j.ObjectMeta.UID)
if !found && !IsJobFinished(&j) { if !found && !IsJobFinished(&j) {
recorder.Eventf(sj, v1.EventTypeWarning, "UnexpectedJob", "Saw a job that the controller did not create or forgot: %s", j.Name) recorder.Eventf(cj, v1.EventTypeWarning, "UnexpectedJob", "Saw a job that the controller did not create or forgot: %s", j.Name)
// We found an unfinished job that has us as the parent, but it is not in our Active list. // We found an unfinished job that has us as the parent, but it is not in our Active list.
// This could happen if we crashed right after creating the Job and before updating the status, // This could happen if we crashed right after creating the Job and before updating the status,
// or if our jobs list is newer than our sj status after a relist, or if someone intentionally created // or if our jobs list is newer than our cj status after a relist, or if someone intentionally created
// a job that they wanted us to adopt. // a job that they wanted us to adopt.
// TODO: maybe handle the adoption case? Concurrency/suspend rules will not apply in that case, obviously, since we can't // TODO: maybe handle the adoption case? Concurrency/suspend rules will not apply in that case, obviously, since we can't
// stop users from creating jobs if they have permission. It is assumed that if a // stop users from creating jobs if they have permission. It is assumed that if a
// user has permission to create a job within a namespace, then they have permission to make any scheduledJob // user has permission to create a job within a namespace, then they have permission to make any cronJob
// in the same namespace "adopt" that job. ReplicaSets and their Pods work the same way. // in the same namespace "adopt" that job. ReplicaSets and their Pods work the same way.
// TBS: how to update sj.Status.LastScheduleTime if the adopted job is newer than any we knew about? // TBS: how to update cj.Status.LastScheduleTime if the adopted job is newer than any we knew about?
} else if found && IsJobFinished(&j) { } else if found && IsJobFinished(&j) {
_, status := getFinishedStatus(&j) _, status := getFinishedStatus(&j)
deleteFromActiveList(sj, j.ObjectMeta.UID) deleteFromActiveList(cj, j.ObjectMeta.UID)
recorder.Eventf(sj, v1.EventTypeNormal, "SawCompletedJob", "Saw completed job: %s, status: %v", j.Name, status) recorder.Eventf(cj, v1.EventTypeNormal, "SawCompletedJob", "Saw completed job: %s, status: %v", j.Name, status)
} }
} }
// Remove any job reference from the active list if the corresponding job does not exist any more. // Remove any job reference from the active list if the corresponding job does not exist any more.
// Otherwise, the cronjob may be stuck in active mode forever even though there is no matching // Otherwise, the cronjob may be stuck in active mode forever even though there is no matching
// job running. // job running.
for _, j := range sj.Status.Active { for _, j := range cj.Status.Active {
if found := childrenJobs[j.UID]; !found { if found := childrenJobs[j.UID]; !found {
recorder.Eventf(sj, v1.EventTypeNormal, "MissingJob", "Active job went missing: %v", j.Name) recorder.Eventf(cj, v1.EventTypeNormal, "MissingJob", "Active job went missing: %v", j.Name)
deleteFromActiveList(sj, j.UID) deleteFromActiveList(cj, j.UID)
} }
} }
updatedSJ, err := sjc.UpdateStatus(sj) updatedCJ, err := cjc.UpdateStatus(cj)
if err != nil { if err != nil {
klog.Errorf("Unable to update status for %s (rv = %s): %v", nameForLog, sj.ResourceVersion, err) klog.Errorf("Unable to update status for %s (rv = %s): %v", nameForLog, cj.ResourceVersion, err)
return return
} }
*sj = *updatedSJ *cj = *updatedCJ
if sj.DeletionTimestamp != nil { if cj.DeletionTimestamp != nil {
// The CronJob is being deleted. // The CronJob is being deleted.
// Don't do anything other than updating status. // Don't do anything other than updating status.
return return
} }
if sj.Spec.Suspend != nil && *sj.Spec.Suspend { if cj.Spec.Suspend != nil && *cj.Spec.Suspend {
klog.V(4).Infof("Not starting job for %s because it is suspended", nameForLog) klog.V(4).Infof("Not starting job for %s because it is suspended", nameForLog)
return return
} }
times, err := getRecentUnmetScheduleTimes(*sj, now) times, err := getRecentUnmetScheduleTimes(*cj, now)
if err != nil { if err != nil {
recorder.Eventf(sj, v1.EventTypeWarning, "FailedNeedsStart", "Cannot determine if job needs to be started: %v", err) recorder.Eventf(cj, v1.EventTypeWarning, "FailedNeedsStart", "Cannot determine if job needs to be started: %v", err)
klog.Errorf("Cannot determine if %s needs to be started: %v", nameForLog, err) klog.Errorf("Cannot determine if %s needs to be started: %v", nameForLog, err)
return return
} }
@ -284,22 +283,22 @@ func syncOne(sj *batchv1beta1.CronJob, js []batchv1.Job, now time.Time, jc jobCo
scheduledTime := times[len(times)-1] scheduledTime := times[len(times)-1]
tooLate := false tooLate := false
if sj.Spec.StartingDeadlineSeconds != nil { if cj.Spec.StartingDeadlineSeconds != nil {
tooLate = scheduledTime.Add(time.Second * time.Duration(*sj.Spec.StartingDeadlineSeconds)).Before(now) tooLate = scheduledTime.Add(time.Second * time.Duration(*cj.Spec.StartingDeadlineSeconds)).Before(now)
} }
if tooLate { if tooLate {
klog.V(4).Infof("Missed starting window for %s", nameForLog) klog.V(4).Infof("Missed starting window for %s", nameForLog)
recorder.Eventf(sj, v1.EventTypeWarning, "MissSchedule", "Missed scheduled time to start a job: %s", scheduledTime.Format(time.RFC1123Z)) recorder.Eventf(cj, v1.EventTypeWarning, "MissSchedule", "Missed scheduled time to start a job: %s", scheduledTime.Format(time.RFC1123Z))
// TODO: Since we don't set LastScheduleTime when not scheduling, we are going to keep noticing // TODO: Since we don't set LastScheduleTime when not scheduling, we are going to keep noticing
// the miss every cycle. In order to avoid sending multiple events, and to avoid processing // the miss every cycle. In order to avoid sending multiple events, and to avoid processing
// the sj again and again, we could set a Status.LastMissedTime when we notice a miss. // the cj again and again, we could set a Status.LastMissedTime when we notice a miss.
// Then, when we call getRecentUnmetScheduleTimes, we can take max(creationTimestamp, // Then, when we call getRecentUnmetScheduleTimes, we can take max(creationTimestamp,
// Status.LastScheduleTime, Status.LastMissedTime), and then so we won't generate // Status.LastScheduleTime, Status.LastMissedTime), and then so we won't generate
// and event the next time we process it, and also so the user looking at the status // and event the next time we process it, and also so the user looking at the status
// can see easily that there was a missed execution. // can see easily that there was a missed execution.
return return
} }
if sj.Spec.ConcurrencyPolicy == batchv1beta1.ForbidConcurrent && len(sj.Status.Active) > 0 { if cj.Spec.ConcurrencyPolicy == batchv1beta1.ForbidConcurrent && len(cj.Status.Active) > 0 {
// Regardless which source of information we use for the set of active jobs, // Regardless which source of information we use for the set of active jobs,
// there is some risk that we won't see an active job when there is one. // there is some risk that we won't see an active job when there is one.
// (because we haven't seen the status update to the SJ or the created pod). // (because we haven't seen the status update to the SJ or the created pod).
@ -312,37 +311,37 @@ func syncOne(sj *batchv1beta1.CronJob, js []batchv1.Job, now time.Time, jc jobCo
klog.V(4).Infof("Not starting job for %s because of prior execution still running and concurrency policy is Forbid", nameForLog) klog.V(4).Infof("Not starting job for %s because of prior execution still running and concurrency policy is Forbid", nameForLog)
return return
} }
if sj.Spec.ConcurrencyPolicy == batchv1beta1.ReplaceConcurrent { if cj.Spec.ConcurrencyPolicy == batchv1beta1.ReplaceConcurrent {
for _, j := range sj.Status.Active { for _, j := range cj.Status.Active {
klog.V(4).Infof("Deleting job %s of %s that was still running at next scheduled start time", j.Name, nameForLog) klog.V(4).Infof("Deleting job %s of %s that was still running at next scheduled start time", j.Name, nameForLog)
job, err := jc.GetJob(j.Namespace, j.Name) job, err := jc.GetJob(j.Namespace, j.Name)
if err != nil { if err != nil {
recorder.Eventf(sj, v1.EventTypeWarning, "FailedGet", "Get job: %v", err) recorder.Eventf(cj, v1.EventTypeWarning, "FailedGet", "Get job: %v", err)
return return
} }
if !deleteJob(sj, job, jc, recorder) { if !deleteJob(cj, job, jc, recorder) {
return return
} }
} }
} }
jobReq, err := getJobFromTemplate(sj, scheduledTime) jobReq, err := getJobFromTemplate(cj, scheduledTime)
if err != nil { if err != nil {
klog.Errorf("Unable to make Job from template in %s: %v", nameForLog, err) klog.Errorf("Unable to make Job from template in %s: %v", nameForLog, err)
return return
} }
jobResp, err := jc.CreateJob(sj.Namespace, jobReq) jobResp, err := jc.CreateJob(cj.Namespace, jobReq)
if err != nil { if err != nil {
// If the namespace is being torn down, we can safely ignore // If the namespace is being torn down, we can safely ignore
// this error since all subsequent creations will fail. // this error since all subsequent creations will fail.
if !errors.HasStatusCause(err, v1.NamespaceTerminatingCause) { if !errors.HasStatusCause(err, v1.NamespaceTerminatingCause) {
recorder.Eventf(sj, v1.EventTypeWarning, "FailedCreate", "Error creating job: %v", err) recorder.Eventf(cj, v1.EventTypeWarning, "FailedCreate", "Error creating job: %v", err)
} }
return return
} }
klog.V(4).Infof("Created Job %s for %s", jobResp.Name, nameForLog) klog.V(4).Infof("Created Job %s for %s", jobResp.Name, nameForLog)
recorder.Eventf(sj, v1.EventTypeNormal, "SuccessfulCreate", "Created job %v", jobResp.Name) recorder.Eventf(cj, v1.EventTypeNormal, "SuccessfulCreate", "Created job %v", jobResp.Name)
// ------------------------------------------------------------------ // // ------------------------------------------------------------------ //
@ -359,29 +358,29 @@ func syncOne(sj *batchv1beta1.CronJob, js []batchv1.Job, now time.Time, jc jobCo
if err != nil { if err != nil {
klog.V(2).Infof("Unable to make object reference for job for %s", nameForLog) klog.V(2).Infof("Unable to make object reference for job for %s", nameForLog)
} else { } else {
sj.Status.Active = append(sj.Status.Active, *ref) cj.Status.Active = append(cj.Status.Active, *ref)
} }
sj.Status.LastScheduleTime = &metav1.Time{Time: scheduledTime} cj.Status.LastScheduleTime = &metav1.Time{Time: scheduledTime}
if _, err := sjc.UpdateStatus(sj); err != nil { if _, err := cjc.UpdateStatus(cj); err != nil {
klog.Infof("Unable to update status for %s (rv = %s): %v", nameForLog, sj.ResourceVersion, err) klog.Infof("Unable to update status for %s (rv = %s): %v", nameForLog, cj.ResourceVersion, err)
} }
return return
} }
// deleteJob reaps a job, deleting the job, the pods and the reference in the active list // deleteJob reaps a job, deleting the job, the pods and the reference in the active list
func deleteJob(sj *batchv1beta1.CronJob, job *batchv1.Job, jc jobControlInterface, recorder record.EventRecorder) bool { func deleteJob(cj *batchv1beta1.CronJob, job *batchv1.Job, jc jobControlInterface, recorder record.EventRecorder) bool {
nameForLog := fmt.Sprintf("%s/%s", sj.Namespace, sj.Name) nameForLog := fmt.Sprintf("%s/%s", cj.Namespace, cj.Name)
// delete the job itself... // delete the job itself...
if err := jc.DeleteJob(job.Namespace, job.Name); err != nil { if err := jc.DeleteJob(job.Namespace, job.Name); err != nil {
recorder.Eventf(sj, v1.EventTypeWarning, "FailedDelete", "Deleted job: %v", err) recorder.Eventf(cj, v1.EventTypeWarning, "FailedDelete", "Deleted job: %v", err)
klog.Errorf("Error deleting job %s from %s: %v", job.Name, nameForLog, err) klog.Errorf("Error deleting job %s from %s: %v", job.Name, nameForLog, err)
return false return false
} }
// ... and its reference from active list // ... and its reference from active list
deleteFromActiveList(sj, job.ObjectMeta.UID) deleteFromActiveList(cj, job.ObjectMeta.UID)
recorder.Eventf(sj, v1.EventTypeNormal, "SuccessfulDelete", "Deleted job %v", job.Name) recorder.Eventf(cj, v1.EventTypeNormal, "SuccessfulDelete", "Deleted job %v", job.Name)
return true return true
} }

View File

@ -179,13 +179,13 @@ func TestSyncOne_RunOrNot(t *testing.T) {
} }
testCases := map[string]struct { testCases := map[string]struct {
// sj spec // cj spec
concurrencyPolicy batchV1beta1.ConcurrencyPolicy concurrencyPolicy batchV1beta1.ConcurrencyPolicy
suspend bool suspend bool
schedule string schedule string
deadline int64 deadline int64
// sj status // cj status
ranPreviously bool ranPreviously bool
stillActive bool stillActive bool
@ -253,12 +253,12 @@ func TestSyncOne_RunOrNot(t *testing.T) {
name := name name := name
tc := tc tc := tc
t.Run(name, func(t *testing.T) { t.Run(name, func(t *testing.T) {
sj := cronJob() cj := cronJob()
sj.Spec.ConcurrencyPolicy = tc.concurrencyPolicy cj.Spec.ConcurrencyPolicy = tc.concurrencyPolicy
sj.Spec.Suspend = &tc.suspend cj.Spec.Suspend = &tc.suspend
sj.Spec.Schedule = tc.schedule cj.Spec.Schedule = tc.schedule
if tc.deadline != noDead { if tc.deadline != noDead {
sj.Spec.StartingDeadlineSeconds = &tc.deadline cj.Spec.StartingDeadlineSeconds = &tc.deadline
} }
var ( var (
@ -267,30 +267,30 @@ func TestSyncOne_RunOrNot(t *testing.T) {
) )
js := []batchv1.Job{} js := []batchv1.Job{}
if tc.ranPreviously { if tc.ranPreviously {
sj.ObjectMeta.CreationTimestamp = metav1.Time{Time: justBeforeThePriorHour()} cj.ObjectMeta.CreationTimestamp = metav1.Time{Time: justBeforeThePriorHour()}
sj.Status.LastScheduleTime = &metav1.Time{Time: justAfterThePriorHour()} cj.Status.LastScheduleTime = &metav1.Time{Time: justAfterThePriorHour()}
job, err = getJobFromTemplate(&sj, sj.Status.LastScheduleTime.Time) job, err = getJobFromTemplate(&cj, cj.Status.LastScheduleTime.Time)
if err != nil { if err != nil {
t.Fatalf("%s: unexpected error creating a job from template: %v", name, err) t.Fatalf("%s: unexpected error creating a job from template: %v", name, err)
} }
job.UID = "1234" job.UID = "1234"
job.Namespace = "" job.Namespace = ""
if tc.stillActive { if tc.stillActive {
sj.Status.Active = []v1.ObjectReference{{UID: job.UID}} cj.Status.Active = []v1.ObjectReference{{UID: job.UID}}
js = append(js, *job) js = append(js, *job)
} }
} else { } else {
sj.ObjectMeta.CreationTimestamp = metav1.Time{Time: justBeforeTheHour()} cj.ObjectMeta.CreationTimestamp = metav1.Time{Time: justBeforeTheHour()}
if tc.stillActive { if tc.stillActive {
t.Errorf("%s: test setup error: this case makes no sense", name) t.Errorf("%s: test setup error: this case makes no sense", name)
} }
} }
jc := &fakeJobControl{Job: job} jc := &fakeJobControl{Job: job}
sjc := &fakeSJControl{} cjc := &fakeCJControl{}
recorder := record.NewFakeRecorder(10) recorder := record.NewFakeRecorder(10)
syncOne(&sj, js, tc.now, jc, sjc, recorder) syncOne(&cj, js, tc.now, jc, cjc, recorder)
expectedCreates := 0 expectedCreates := 0
if tc.expectCreate { if tc.expectCreate {
expectedCreates = 1 expectedCreates = 1
@ -310,10 +310,10 @@ func TestSyncOne_RunOrNot(t *testing.T) {
if got, want := controllerRef.Kind, "CronJob"; got != want { if got, want := controllerRef.Kind, "CronJob"; got != want {
t.Errorf("%s: controllerRef.Kind = %q, want %q", name, got, want) t.Errorf("%s: controllerRef.Kind = %q, want %q", name, got, want)
} }
if got, want := controllerRef.Name, sj.Name; got != want { if got, want := controllerRef.Name, cj.Name; got != want {
t.Errorf("%s: controllerRef.Name = %q, want %q", name, got, want) t.Errorf("%s: controllerRef.Name = %q, want %q", name, got, want)
} }
if got, want := controllerRef.UID, sj.UID; got != want { if got, want := controllerRef.UID, cj.UID; got != want {
t.Errorf("%s: controllerRef.UID = %q, want %q", name, got, want) t.Errorf("%s: controllerRef.UID = %q, want %q", name, got, want)
} }
if controllerRef.Controller == nil || *controllerRef.Controller != true { if controllerRef.Controller == nil || *controllerRef.Controller != true {
@ -357,8 +357,8 @@ func TestSyncOne_RunOrNot(t *testing.T) {
t.Errorf("%s: expected %d warnings, actually %v", name, tc.expectedWarnings, numWarnings) t.Errorf("%s: expected %d warnings, actually %v", name, tc.expectedWarnings, numWarnings)
} }
if tc.expectActive != len(sjc.Updates[expectUpdates-1].Status.Active) { if tc.expectActive != len(cjc.Updates[expectUpdates-1].Status.Active) {
t.Errorf("%s: expected Active size %d, got %d", name, tc.expectActive, len(sjc.Updates[expectUpdates-1].Status.Active)) t.Errorf("%s: expected Active size %d, got %d", name, tc.expectActive, len(cjc.Updates[expectUpdates-1].Status.Active))
} }
}) })
} }
@ -493,14 +493,14 @@ func TestCleanupFinishedJobs_DeleteOrNot(t *testing.T) {
name := name name := name
tc := tc tc := tc
t.Run(name, func(t *testing.T) { t.Run(name, func(t *testing.T) {
sj := cronJob() cj := cronJob()
suspend := false suspend := false
sj.Spec.ConcurrencyPolicy = f cj.Spec.ConcurrencyPolicy = f
sj.Spec.Suspend = &suspend cj.Spec.Suspend = &suspend
sj.Spec.Schedule = onTheHour cj.Spec.Schedule = onTheHour
sj.Spec.SuccessfulJobsHistoryLimit = tc.successfulJobsHistoryLimit cj.Spec.SuccessfulJobsHistoryLimit = tc.successfulJobsHistoryLimit
sj.Spec.FailedJobsHistoryLimit = tc.failedJobsHistoryLimit cj.Spec.FailedJobsHistoryLimit = tc.failedJobsHistoryLimit
var ( var (
job *batchv1.Job job *batchv1.Job
@ -511,19 +511,19 @@ func TestCleanupFinishedJobs_DeleteOrNot(t *testing.T) {
if len(tc.jobSpecs) != 0 { if len(tc.jobSpecs) != 0 {
firstTime := startTimeStringToTime(tc.jobSpecs[0].StartTime) firstTime := startTimeStringToTime(tc.jobSpecs[0].StartTime)
lastTime := startTimeStringToTime(tc.jobSpecs[len(tc.jobSpecs)-1].StartTime) lastTime := startTimeStringToTime(tc.jobSpecs[len(tc.jobSpecs)-1].StartTime)
sj.ObjectMeta.CreationTimestamp = metav1.Time{Time: firstTime} cj.ObjectMeta.CreationTimestamp = metav1.Time{Time: firstTime}
sj.Status.LastScheduleTime = &metav1.Time{Time: lastTime} cj.Status.LastScheduleTime = &metav1.Time{Time: lastTime}
} else { } else {
sj.ObjectMeta.CreationTimestamp = metav1.Time{Time: justBeforeTheHour()} cj.ObjectMeta.CreationTimestamp = metav1.Time{Time: justBeforeTheHour()}
} }
// Create jobs // Create jobs
js := []batchv1.Job{} js := []batchv1.Job{}
jobsToDelete := sets.NewString() jobsToDelete := sets.NewString()
sj.Status.Active = []v1.ObjectReference{} cj.Status.Active = []v1.ObjectReference{}
for i, spec := range tc.jobSpecs { for i, spec := range tc.jobSpecs {
job, err = getJobFromTemplate(&sj, startTimeStringToTime(spec.StartTime)) job, err = getJobFromTemplate(&cj, startTimeStringToTime(spec.StartTime))
if err != nil { if err != nil {
t.Fatalf("%s: unexpected error creating a job from template: %v", name, err) t.Fatalf("%s: unexpected error creating a job from template: %v", name, err)
} }
@ -542,13 +542,13 @@ func TestCleanupFinishedJobs_DeleteOrNot(t *testing.T) {
job.Status.Conditions = append(job.Status.Conditions, condition) job.Status.Conditions = append(job.Status.Conditions, condition)
if spec.IsStillInActiveList { if spec.IsStillInActiveList {
sj.Status.Active = append(sj.Status.Active, v1.ObjectReference{UID: job.UID}) cj.Status.Active = append(cj.Status.Active, v1.ObjectReference{UID: job.UID})
} }
} else { } else {
if spec.IsSuccessful || spec.IsStillInActiveList { if spec.IsSuccessful || spec.IsStillInActiveList {
t.Errorf("%s: test setup error: this case makes no sense", name) t.Errorf("%s: test setup error: this case makes no sense", name)
} }
sj.Status.Active = append(sj.Status.Active, v1.ObjectReference{UID: job.UID}) cj.Status.Active = append(cj.Status.Active, v1.ObjectReference{UID: job.UID})
} }
js = append(js, *job) js = append(js, *job)
@ -558,10 +558,10 @@ func TestCleanupFinishedJobs_DeleteOrNot(t *testing.T) {
} }
jc := &fakeJobControl{Job: job} jc := &fakeJobControl{Job: job}
sjc := &fakeSJControl{} cjc := &fakeCJControl{}
recorder := record.NewFakeRecorder(10) recorder := record.NewFakeRecorder(10)
cleanupFinishedJobs(&sj, js, jc, sjc, recorder) cleanupFinishedJobs(&cj, js, jc, cjc, recorder)
// Check we have actually deleted the correct jobs // Check we have actually deleted the correct jobs
if len(jc.DeleteJobName) != len(jobsToDelete) { if len(jc.DeleteJobName) != len(jobsToDelete) {
@ -584,8 +584,8 @@ func TestCleanupFinishedJobs_DeleteOrNot(t *testing.T) {
// Check for jobs still in active list // Check for jobs still in active list
numActive := 0 numActive := 0
if len(sjc.Updates) != 0 { if len(cjc.Updates) != 0 {
numActive = len(sjc.Updates[len(sjc.Updates)-1].Status.Active) numActive = len(cjc.Updates[len(cjc.Updates)-1].Status.Active)
} }
if tc.expectActive != numActive { if tc.expectActive != numActive {
t.Errorf("%s: expected Active size %d, got %d", name, tc.expectActive, numActive) t.Errorf("%s: expected Active size %d, got %d", name, tc.expectActive, numActive)
@ -597,7 +597,7 @@ func TestCleanupFinishedJobs_DeleteOrNot(t *testing.T) {
// TODO: simulation where the controller randomly doesn't run, and randomly has errors starting jobs or deleting jobs, // TODO: simulation where the controller randomly doesn't run, and randomly has errors starting jobs or deleting jobs,
// but over time, all jobs run as expected (assuming Allow and no deadline). // but over time, all jobs run as expected (assuming Allow and no deadline).
// TestSyncOne_Status tests sj.UpdateStatus in syncOne // TestSyncOne_Status tests cj.UpdateStatus in syncOne
func TestSyncOne_Status(t *testing.T) { func TestSyncOne_Status(t *testing.T) {
finishedJob := newJob("1") finishedJob := newJob("1")
finishedJob.Status.Conditions = append(finishedJob.Status.Conditions, batchv1.JobCondition{Type: batchv1.JobComplete, Status: v1.ConditionTrue}) finishedJob.Status.Conditions = append(finishedJob.Status.Conditions, batchv1.JobCondition{Type: batchv1.JobComplete, Status: v1.ConditionTrue})
@ -605,13 +605,13 @@ func TestSyncOne_Status(t *testing.T) {
missingJob := newJob("3") missingJob := newJob("3")
testCases := map[string]struct { testCases := map[string]struct {
// sj spec // cj spec
concurrencyPolicy batchV1beta1.ConcurrencyPolicy concurrencyPolicy batchV1beta1.ConcurrencyPolicy
suspend bool suspend bool
schedule string schedule string
deadline int64 deadline int64
// sj status // cj status
ranPreviously bool ranPreviously bool
hasFinishedJob bool hasFinishedJob bool
@ -681,21 +681,21 @@ func TestSyncOne_Status(t *testing.T) {
tc := tc tc := tc
t.Run(name, func(t *testing.T) { t.Run(name, func(t *testing.T) {
// Setup the test // Setup the test
sj := cronJob() cj := cronJob()
sj.Spec.ConcurrencyPolicy = tc.concurrencyPolicy cj.Spec.ConcurrencyPolicy = tc.concurrencyPolicy
sj.Spec.Suspend = &tc.suspend cj.Spec.Suspend = &tc.suspend
sj.Spec.Schedule = tc.schedule cj.Spec.Schedule = tc.schedule
if tc.deadline != noDead { if tc.deadline != noDead {
sj.Spec.StartingDeadlineSeconds = &tc.deadline cj.Spec.StartingDeadlineSeconds = &tc.deadline
} }
if tc.ranPreviously { if tc.ranPreviously {
sj.ObjectMeta.CreationTimestamp = metav1.Time{Time: justBeforeThePriorHour()} cj.ObjectMeta.CreationTimestamp = metav1.Time{Time: justBeforeThePriorHour()}
sj.Status.LastScheduleTime = &metav1.Time{Time: justAfterThePriorHour()} cj.Status.LastScheduleTime = &metav1.Time{Time: justAfterThePriorHour()}
} else { } else {
if tc.hasFinishedJob || tc.hasUnexpectedJob || tc.hasMissingJob { if tc.hasFinishedJob || tc.hasUnexpectedJob || tc.hasMissingJob {
t.Errorf("%s: test setup error: this case makes no sense", name) t.Errorf("%s: test setup error: this case makes no sense", name)
} }
sj.ObjectMeta.CreationTimestamp = metav1.Time{Time: justBeforeTheHour()} cj.ObjectMeta.CreationTimestamp = metav1.Time{Time: justBeforeTheHour()}
} }
jobs := []batchv1.Job{} jobs := []batchv1.Job{}
if tc.hasFinishedJob { if tc.hasFinishedJob {
@ -703,7 +703,7 @@ func TestSyncOne_Status(t *testing.T) {
if err != nil { if err != nil {
t.Errorf("%s: test setup error: failed to get job's ref: %v.", name, err) t.Errorf("%s: test setup error: failed to get job's ref: %v.", name, err)
} }
sj.Status.Active = []v1.ObjectReference{*ref} cj.Status.Active = []v1.ObjectReference{*ref}
jobs = append(jobs, finishedJob) jobs = append(jobs, finishedJob)
} }
if tc.hasUnexpectedJob { if tc.hasUnexpectedJob {
@ -714,19 +714,19 @@ func TestSyncOne_Status(t *testing.T) {
if err != nil { if err != nil {
t.Errorf("%s: test setup error: failed to get job's ref: %v.", name, err) t.Errorf("%s: test setup error: failed to get job's ref: %v.", name, err)
} }
sj.Status.Active = append(sj.Status.Active, *ref) cj.Status.Active = append(cj.Status.Active, *ref)
} }
if tc.beingDeleted { if tc.beingDeleted {
timestamp := metav1.NewTime(tc.now) timestamp := metav1.NewTime(tc.now)
sj.DeletionTimestamp = &timestamp cj.DeletionTimestamp = &timestamp
} }
jc := &fakeJobControl{} jc := &fakeJobControl{}
sjc := &fakeSJControl{} cjc := &fakeCJControl{}
recorder := record.NewFakeRecorder(10) recorder := record.NewFakeRecorder(10)
// Run the code // Run the code
syncOne(&sj, jobs, tc.now, jc, sjc, recorder) syncOne(&cj, jobs, tc.now, jc, cjc, recorder)
// Status update happens once when ranging through job list, and another one if create jobs. // Status update happens once when ranging through job list, and another one if create jobs.
expectUpdates := 1 expectUpdates := 1
@ -753,24 +753,24 @@ func TestSyncOne_Status(t *testing.T) {
t.Errorf("%s: expected %d event, actually %v: %#v", name, expectedEvents, len(recorder.Events), recorder.Events) t.Errorf("%s: expected %d event, actually %v: %#v", name, expectedEvents, len(recorder.Events), recorder.Events)
} }
if expectUpdates != len(sjc.Updates) { if expectUpdates != len(cjc.Updates) {
t.Errorf("%s: expected %d status updates, actually %d", name, expectUpdates, len(sjc.Updates)) t.Errorf("%s: expected %d status updates, actually %d", name, expectUpdates, len(cjc.Updates))
} }
if tc.hasFinishedJob && inActiveList(sjc.Updates[0], finishedJob.UID) { if tc.hasFinishedJob && inActiveList(cjc.Updates[0], finishedJob.UID) {
t.Errorf("%s: expected finished job removed from active list, actually active list = %#v", name, sjc.Updates[0].Status.Active) t.Errorf("%s: expected finished job removed from active list, actually active list = %#v", name, cjc.Updates[0].Status.Active)
} }
if tc.hasUnexpectedJob && inActiveList(sjc.Updates[0], unexpectedJob.UID) { if tc.hasUnexpectedJob && inActiveList(cjc.Updates[0], unexpectedJob.UID) {
t.Errorf("%s: expected unexpected job not added to active list, actually active list = %#v", name, sjc.Updates[0].Status.Active) t.Errorf("%s: expected unexpected job not added to active list, actually active list = %#v", name, cjc.Updates[0].Status.Active)
} }
if tc.hasMissingJob && inActiveList(sjc.Updates[0], missingJob.UID) { if tc.hasMissingJob && inActiveList(cjc.Updates[0], missingJob.UID) {
t.Errorf("%s: expected missing job to be removed from active list, actually active list = %#v", name, sjc.Updates[0].Status.Active) t.Errorf("%s: expected missing job to be removed from active list, actually active list = %#v", name, cjc.Updates[0].Status.Active)
} }
if tc.expectCreate && !sjc.Updates[1].Status.LastScheduleTime.Time.Equal(topOfTheHour()) { if tc.expectCreate && !cjc.Updates[1].Status.LastScheduleTime.Time.Equal(topOfTheHour()) {
t.Errorf("%s: expected LastScheduleTime updated to %s, got %s", name, topOfTheHour(), sjc.Updates[1].Status.LastScheduleTime) t.Errorf("%s: expected LastScheduleTime updated to %s, got %s", name, topOfTheHour(), cjc.Updates[1].Status.LastScheduleTime)
} }
}) })
} }

View File

@ -31,33 +31,33 @@ import (
"k8s.io/client-go/tools/record" "k8s.io/client-go/tools/record"
) )
// sjControlInterface is an interface that knows how to update CronJob status // cjControlInterface is an interface that knows how to update CronJob status
// created as an interface to allow testing. // created as an interface to allow testing.
type sjControlInterface interface { type cjControlInterface interface {
UpdateStatus(sj *batchv1beta1.CronJob) (*batchv1beta1.CronJob, error) UpdateStatus(cj *batchv1beta1.CronJob) (*batchv1beta1.CronJob, error)
} }
// realSJControl is the default implementation of sjControlInterface. // realCJControl is the default implementation of cjControlInterface.
type realSJControl struct { type realCJControl struct {
KubeClient clientset.Interface KubeClient clientset.Interface
} }
var _ sjControlInterface = &realSJControl{} var _ cjControlInterface = &realCJControl{}
func (c *realSJControl) UpdateStatus(sj *batchv1beta1.CronJob) (*batchv1beta1.CronJob, error) { func (c *realCJControl) UpdateStatus(cj *batchv1beta1.CronJob) (*batchv1beta1.CronJob, error) {
return c.KubeClient.BatchV1beta1().CronJobs(sj.Namespace).UpdateStatus(context.TODO(), sj, metav1.UpdateOptions{}) return c.KubeClient.BatchV1beta1().CronJobs(cj.Namespace).UpdateStatus(context.TODO(), cj, metav1.UpdateOptions{})
} }
// fakeSJControl is the default implementation of sjControlInterface. // fakeCJControl is the default implementation of cjControlInterface.
type fakeSJControl struct { type fakeCJControl struct {
Updates []batchv1beta1.CronJob Updates []batchv1beta1.CronJob
} }
var _ sjControlInterface = &fakeSJControl{} var _ cjControlInterface = &fakeCJControl{}
func (c *fakeSJControl) UpdateStatus(sj *batchv1beta1.CronJob) (*batchv1beta1.CronJob, error) { func (c *fakeCJControl) UpdateStatus(cj *batchv1beta1.CronJob) (*batchv1beta1.CronJob, error) {
c.Updates = append(c.Updates, *sj) c.Updates = append(c.Updates, *cj)
return sj, nil return cj, nil
} }
// ------------------------------------------------------------------ // // ------------------------------------------------------------------ //

View File

@ -32,8 +32,8 @@ import (
// Utilities for dealing with Jobs and CronJobs and time. // Utilities for dealing with Jobs and CronJobs and time.
func inActiveList(sj batchv1beta1.CronJob, uid types.UID) bool { func inActiveList(cj batchv1beta1.CronJob, uid types.UID) bool {
for _, j := range sj.Status.Active { for _, j := range cj.Status.Active {
if j.UID == uid { if j.UID == uid {
return true return true
} }
@ -41,17 +41,17 @@ func inActiveList(sj batchv1beta1.CronJob, uid types.UID) bool {
return false return false
} }
func deleteFromActiveList(sj *batchv1beta1.CronJob, uid types.UID) { func deleteFromActiveList(cj *batchv1beta1.CronJob, uid types.UID) {
if sj == nil { if cj == nil {
return return
} }
newActive := []v1.ObjectReference{} newActive := []v1.ObjectReference{}
for _, j := range sj.Status.Active { for _, j := range cj.Status.Active {
if j.UID != uid { if j.UID != uid {
newActive = append(newActive, j) newActive = append(newActive, j)
} }
} }
sj.Status.Active = newActive cj.Status.Active = newActive
} }
// getParentUIDFromJob extracts UID of job's parent and whether it was found // getParentUIDFromJob extracts UID of job's parent and whether it was found
@ -70,47 +70,47 @@ func getParentUIDFromJob(j batchv1.Job) (types.UID, bool) {
return controllerRef.UID, true return controllerRef.UID, true
} }
// groupJobsByParent groups jobs into a map keyed by the job parent UID (e.g. scheduledJob). // groupJobsByParent groups jobs into a map keyed by the job parent UID (e.g. cronJob).
// It has no receiver, to facilitate testing. // It has no receiver, to facilitate testing.
func groupJobsByParent(js []batchv1.Job) map[types.UID][]batchv1.Job { func groupJobsByParent(js []batchv1.Job) map[types.UID][]batchv1.Job {
jobsBySj := make(map[types.UID][]batchv1.Job) jobsByCj := make(map[types.UID][]batchv1.Job)
for _, job := range js { for _, job := range js {
parentUID, found := getParentUIDFromJob(job) parentUID, found := getParentUIDFromJob(job)
if !found { if !found {
klog.V(4).Infof("Unable to get parent uid from job %s in namespace %s", job.Name, job.Namespace) klog.V(4).Infof("Unable to get parent uid from job %s in namespace %s", job.Name, job.Namespace)
continue continue
} }
jobsBySj[parentUID] = append(jobsBySj[parentUID], job) jobsByCj[parentUID] = append(jobsByCj[parentUID], job)
} }
return jobsBySj return jobsByCj
} }
// getRecentUnmetScheduleTimes gets a slice of times (from oldest to latest) that have passed when a Job should have started but did not. // getRecentUnmetScheduleTimes gets a slice of times (from oldest to latest) that have passed when a Job should have started but did not.
// //
// If there are too many (>100) unstarted times, just give up and return an empty slice. // If there are too many (>100) unstarted times, just give up and return an empty slice.
// If there were missed times prior to the last known start time, then those are not returned. // If there were missed times prior to the last known start time, then those are not returned.
func getRecentUnmetScheduleTimes(sj batchv1beta1.CronJob, now time.Time) ([]time.Time, error) { func getRecentUnmetScheduleTimes(cj batchv1beta1.CronJob, now time.Time) ([]time.Time, error) {
starts := []time.Time{} starts := []time.Time{}
sched, err := cron.ParseStandard(sj.Spec.Schedule) sched, err := cron.ParseStandard(cj.Spec.Schedule)
if err != nil { if err != nil {
return starts, fmt.Errorf("unparseable schedule: %s : %s", sj.Spec.Schedule, err) return starts, fmt.Errorf("unparseable schedule: %s : %s", cj.Spec.Schedule, err)
} }
var earliestTime time.Time var earliestTime time.Time
if sj.Status.LastScheduleTime != nil { if cj.Status.LastScheduleTime != nil {
earliestTime = sj.Status.LastScheduleTime.Time earliestTime = cj.Status.LastScheduleTime.Time
} else { } else {
// If none found, then this is either a recently created scheduledJob, // If none found, then this is either a recently created cronJob,
// or the active/completed info was somehow lost (contract for status // or the active/completed info was somehow lost (contract for status
// in kubernetes says it may need to be recreated), or that we have // in kubernetes says it may need to be recreated), or that we have
// started a job, but have not noticed it yet (distributed systems can // started a job, but have not noticed it yet (distributed systems can
// have arbitrary delays). In any case, use the creation time of the // have arbitrary delays). In any case, use the creation time of the
// CronJob as last known start time. // CronJob as last known start time.
earliestTime = sj.ObjectMeta.CreationTimestamp.Time earliestTime = cj.ObjectMeta.CreationTimestamp.Time
} }
if sj.Spec.StartingDeadlineSeconds != nil { if cj.Spec.StartingDeadlineSeconds != nil {
// Controller is not going to schedule anything below this point // Controller is not going to schedule anything below this point
schedulingDeadline := now.Add(-time.Second * time.Duration(*sj.Spec.StartingDeadlineSeconds)) schedulingDeadline := now.Add(-time.Second * time.Duration(*cj.Spec.StartingDeadlineSeconds))
if schedulingDeadline.After(earliestTime) { if schedulingDeadline.After(earliestTime) {
earliestTime = schedulingDeadline earliestTime = schedulingDeadline
@ -126,8 +126,8 @@ func getRecentUnmetScheduleTimes(sj batchv1beta1.CronJob, now time.Time) ([]time
// controller gets wedged on friday at 5:01pm when everyone has // controller gets wedged on friday at 5:01pm when everyone has
// gone home, and someone comes in on tuesday AM and discovers // gone home, and someone comes in on tuesday AM and discovers
// the problem and restarts the controller, then all the hourly // the problem and restarts the controller, then all the hourly
// jobs, more than 80 of them for one hourly scheduledJob, should // jobs, more than 80 of them for one hourly cronJob, should
// all start running with no further intervention (if the scheduledJob // all start running with no further intervention (if the cronJob
// allows concurrency and late starts). // allows concurrency and late starts).
// //
// However, if there is a bug somewhere, or incorrect clock // However, if there is a bug somewhere, or incorrect clock
@ -148,21 +148,21 @@ func getRecentUnmetScheduleTimes(sj batchv1beta1.CronJob, now time.Time) ([]time
} }
// getJobFromTemplate makes a Job from a CronJob // getJobFromTemplate makes a Job from a CronJob
func getJobFromTemplate(sj *batchv1beta1.CronJob, scheduledTime time.Time) (*batchv1.Job, error) { func getJobFromTemplate(cj *batchv1beta1.CronJob, scheduledTime time.Time) (*batchv1.Job, error) {
labels := copyLabels(&sj.Spec.JobTemplate) labels := copyLabels(&cj.Spec.JobTemplate)
annotations := copyAnnotations(&sj.Spec.JobTemplate) annotations := copyAnnotations(&cj.Spec.JobTemplate)
// We want job names for a given nominal start time to have a deterministic name to avoid the same job being created twice // We want job names for a given nominal start time to have a deterministic name to avoid the same job being created twice
name := fmt.Sprintf("%s-%d", sj.Name, getTimeHash(scheduledTime)) name := fmt.Sprintf("%s-%d", cj.Name, getTimeHash(scheduledTime))
job := &batchv1.Job{ job := &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Labels: labels, Labels: labels,
Annotations: annotations, Annotations: annotations,
Name: name, Name: name,
OwnerReferences: []metav1.OwnerReference{*metav1.NewControllerRef(sj, controllerKind)}, OwnerReferences: []metav1.OwnerReference{*metav1.NewControllerRef(cj, controllerKind)},
}, },
} }
sj.Spec.JobTemplate.Spec.DeepCopyInto(&job.Spec) cj.Spec.JobTemplate.Spec.DeepCopyInto(&job.Spec)
return job, nil return job, nil
} }

View File

@ -38,7 +38,7 @@ func TestGetJobFromTemplate(t *testing.T) {
var one int64 = 1 var one int64 = 1
var no bool var no bool
sj := batchv1beta1.CronJob{ cj := batchv1beta1.CronJob{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: "mycronjob", Name: "mycronjob",
Namespace: "snazzycats", Namespace: "snazzycats",
@ -74,7 +74,7 @@ func TestGetJobFromTemplate(t *testing.T) {
} }
var job *batchv1.Job var job *batchv1.Job
job, err := getJobFromTemplate(&sj, time.Time{}) job, err := getJobFromTemplate(&cj, time.Time{})
if err != nil { if err != nil {
t.Errorf("Did not expect error: %s", err) t.Errorf("Did not expect error: %s", err)
} }
@ -173,10 +173,10 @@ func TestGroupJobsByParent(t *testing.T) {
} }
{ {
// Case 1: There are no jobs and scheduledJobs // Case 1: There are no jobs and cronJobs
js := []batchv1.Job{} js := []batchv1.Job{}
jobsBySj := groupJobsByParent(js) jobsByCj := groupJobsByParent(js)
if len(jobsBySj) != 0 { if len(jobsByCj) != 0 {
t.Errorf("Wrong number of items in map") t.Errorf("Wrong number of items in map")
} }
} }
@ -256,7 +256,7 @@ func TestGetRecentUnmetScheduleTimes(t *testing.T) {
t.Errorf("test setup error: %v", err) t.Errorf("test setup error: %v", err)
} }
sj := batchv1beta1.CronJob{ cj := batchv1beta1.CronJob{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: "mycronjob", Name: "mycronjob",
Namespace: metav1.NamespaceDefault, Namespace: metav1.NamespaceDefault,
@ -271,10 +271,10 @@ func TestGetRecentUnmetScheduleTimes(t *testing.T) {
{ {
// Case 1: no known start times, and none needed yet. // Case 1: no known start times, and none needed yet.
// Creation time is before T1. // Creation time is before T1.
sj.ObjectMeta.CreationTimestamp = metav1.Time{Time: T1.Add(-10 * time.Minute)} cj.ObjectMeta.CreationTimestamp = metav1.Time{Time: T1.Add(-10 * time.Minute)}
// Current time is more than creation time, but less than T1. // Current time is more than creation time, but less than T1.
now := T1.Add(-7 * time.Minute) now := T1.Add(-7 * time.Minute)
times, err := getRecentUnmetScheduleTimes(sj, now) times, err := getRecentUnmetScheduleTimes(cj, now)
if err != nil { if err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }
@ -285,10 +285,10 @@ func TestGetRecentUnmetScheduleTimes(t *testing.T) {
{ {
// Case 2: no known start times, and one needed. // Case 2: no known start times, and one needed.
// Creation time is before T1. // Creation time is before T1.
sj.ObjectMeta.CreationTimestamp = metav1.Time{Time: T1.Add(-10 * time.Minute)} cj.ObjectMeta.CreationTimestamp = metav1.Time{Time: T1.Add(-10 * time.Minute)}
// Current time is after T1 // Current time is after T1
now := T1.Add(2 * time.Second) now := T1.Add(2 * time.Second)
times, err := getRecentUnmetScheduleTimes(sj, now) times, err := getRecentUnmetScheduleTimes(cj, now)
if err != nil { if err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }
@ -301,12 +301,12 @@ func TestGetRecentUnmetScheduleTimes(t *testing.T) {
{ {
// Case 3: known LastScheduleTime, no start needed. // Case 3: known LastScheduleTime, no start needed.
// Creation time is before T1. // Creation time is before T1.
sj.ObjectMeta.CreationTimestamp = metav1.Time{Time: T1.Add(-10 * time.Minute)} cj.ObjectMeta.CreationTimestamp = metav1.Time{Time: T1.Add(-10 * time.Minute)}
// Status shows a start at the expected time. // Status shows a start at the expected time.
sj.Status.LastScheduleTime = &metav1.Time{Time: T1} cj.Status.LastScheduleTime = &metav1.Time{Time: T1}
// Current time is after T1 // Current time is after T1
now := T1.Add(2 * time.Minute) now := T1.Add(2 * time.Minute)
times, err := getRecentUnmetScheduleTimes(sj, now) times, err := getRecentUnmetScheduleTimes(cj, now)
if err != nil { if err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }
@ -317,12 +317,12 @@ func TestGetRecentUnmetScheduleTimes(t *testing.T) {
{ {
// Case 4: known LastScheduleTime, a start needed // Case 4: known LastScheduleTime, a start needed
// Creation time is before T1. // Creation time is before T1.
sj.ObjectMeta.CreationTimestamp = metav1.Time{Time: T1.Add(-10 * time.Minute)} cj.ObjectMeta.CreationTimestamp = metav1.Time{Time: T1.Add(-10 * time.Minute)}
// Status shows a start at the expected time. // Status shows a start at the expected time.
sj.Status.LastScheduleTime = &metav1.Time{Time: T1} cj.Status.LastScheduleTime = &metav1.Time{Time: T1}
// Current time is after T1 and after T2 // Current time is after T1 and after T2
now := T2.Add(5 * time.Minute) now := T2.Add(5 * time.Minute)
times, err := getRecentUnmetScheduleTimes(sj, now) times, err := getRecentUnmetScheduleTimes(cj, now)
if err != nil { if err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }
@ -334,11 +334,11 @@ func TestGetRecentUnmetScheduleTimes(t *testing.T) {
} }
{ {
// Case 5: known LastScheduleTime, two starts needed // Case 5: known LastScheduleTime, two starts needed
sj.ObjectMeta.CreationTimestamp = metav1.Time{Time: T1.Add(-2 * time.Hour)} cj.ObjectMeta.CreationTimestamp = metav1.Time{Time: T1.Add(-2 * time.Hour)}
sj.Status.LastScheduleTime = &metav1.Time{Time: T1.Add(-1 * time.Hour)} cj.Status.LastScheduleTime = &metav1.Time{Time: T1.Add(-1 * time.Hour)}
// Current time is after T1 and after T2 // Current time is after T1 and after T2
now := T2.Add(5 * time.Minute) now := T2.Add(5 * time.Minute)
times, err := getRecentUnmetScheduleTimes(sj, now) times, err := getRecentUnmetScheduleTimes(cj, now)
if err != nil { if err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }
@ -355,23 +355,23 @@ func TestGetRecentUnmetScheduleTimes(t *testing.T) {
} }
{ {
// Case 6: now is way way ahead of last start time, and there is no deadline. // Case 6: now is way way ahead of last start time, and there is no deadline.
sj.ObjectMeta.CreationTimestamp = metav1.Time{Time: T1.Add(-2 * time.Hour)} cj.ObjectMeta.CreationTimestamp = metav1.Time{Time: T1.Add(-2 * time.Hour)}
sj.Status.LastScheduleTime = &metav1.Time{Time: T1.Add(-1 * time.Hour)} cj.Status.LastScheduleTime = &metav1.Time{Time: T1.Add(-1 * time.Hour)}
now := T2.Add(10 * 24 * time.Hour) now := T2.Add(10 * 24 * time.Hour)
_, err := getRecentUnmetScheduleTimes(sj, now) _, err := getRecentUnmetScheduleTimes(cj, now)
if err == nil { if err == nil {
t.Errorf("expected an error") t.Errorf("expected an error")
} }
} }
{ {
// Case 7: now is way way ahead of last start time, but there is a short deadline. // Case 7: now is way way ahead of last start time, but there is a short deadline.
sj.ObjectMeta.CreationTimestamp = metav1.Time{Time: T1.Add(-2 * time.Hour)} cj.ObjectMeta.CreationTimestamp = metav1.Time{Time: T1.Add(-2 * time.Hour)}
sj.Status.LastScheduleTime = &metav1.Time{Time: T1.Add(-1 * time.Hour)} cj.Status.LastScheduleTime = &metav1.Time{Time: T1.Add(-1 * time.Hour)}
now := T2.Add(10 * 24 * time.Hour) now := T2.Add(10 * 24 * time.Hour)
// Deadline is short // Deadline is short
deadline := int64(2 * 60 * 60) deadline := int64(2 * 60 * 60)
sj.Spec.StartingDeadlineSeconds = &deadline cj.Spec.StartingDeadlineSeconds = &deadline
_, err := getRecentUnmetScheduleTimes(sj, now) _, err := getRecentUnmetScheduleTimes(cj, now)
if err != nil { if err != nil {
t.Errorf("unexpected error") t.Errorf("unexpected error")
} }

View File

@ -42,7 +42,7 @@ func TestCronJobStrategy(t *testing.T) {
Containers: []api.Container{{Name: "abc", Image: "image", ImagePullPolicy: "IfNotPresent", TerminationMessagePolicy: api.TerminationMessageReadFile}}, Containers: []api.Container{{Name: "abc", Image: "image", ImagePullPolicy: "IfNotPresent", TerminationMessagePolicy: api.TerminationMessageReadFile}},
}, },
} }
scheduledJob := &batch.CronJob{ cronJob := &batch.CronJob{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: "mycronjob", Name: "mycronjob",
Namespace: metav1.NamespaceDefault, Namespace: metav1.NamespaceDefault,
@ -58,11 +58,11 @@ func TestCronJobStrategy(t *testing.T) {
}, },
} }
Strategy.PrepareForCreate(ctx, scheduledJob) Strategy.PrepareForCreate(ctx, cronJob)
if len(scheduledJob.Status.Active) != 0 { if len(cronJob.Status.Active) != 0 {
t.Errorf("CronJob does not allow setting status on create") t.Errorf("CronJob does not allow setting status on create")
} }
errs := Strategy.Validate(ctx, scheduledJob) errs := Strategy.Validate(ctx, cronJob)
if len(errs) != 0 { if len(errs) != 0 {
t.Errorf("Unexpected error validating %v", errs) t.Errorf("Unexpected error validating %v", errs)
} }
@ -78,11 +78,11 @@ func TestCronJobStrategy(t *testing.T) {
} }
// ensure we do not change status // ensure we do not change status
Strategy.PrepareForUpdate(ctx, updatedCronJob, scheduledJob) Strategy.PrepareForUpdate(ctx, updatedCronJob, cronJob)
if updatedCronJob.Status.Active != nil { if updatedCronJob.Status.Active != nil {
t.Errorf("PrepareForUpdate should have preserved prior version status") t.Errorf("PrepareForUpdate should have preserved prior version status")
} }
errs = Strategy.ValidateUpdate(ctx, updatedCronJob, scheduledJob) errs = Strategy.ValidateUpdate(ctx, updatedCronJob, cronJob)
if len(errs) == 0 { if len(errs) == 0 {
t.Errorf("Expected a validation error") t.Errorf("Expected a validation error")
} }
@ -165,10 +165,10 @@ func TestCronJobStatusStrategy(t *testing.T) {
StatusStrategy.PrepareForUpdate(ctx, newCronJob, oldCronJob) StatusStrategy.PrepareForUpdate(ctx, newCronJob, oldCronJob)
if newCronJob.Status.LastScheduleTime == nil { if newCronJob.Status.LastScheduleTime == nil {
t.Errorf("CronJob status updates must allow changes to scheduledJob status") t.Errorf("CronJob status updates must allow changes to cronJob status")
} }
if newCronJob.Spec.Schedule != oldSchedule { if newCronJob.Spec.Schedule != oldSchedule {
t.Errorf("CronJob status updates must now allow changes to scheduledJob spec") t.Errorf("CronJob status updates must now allow changes to cronJob spec")
} }
errs := StatusStrategy.ValidateUpdate(ctx, newCronJob, oldCronJob) errs := StatusStrategy.ValidateUpdate(ctx, newCronJob, oldCronJob)
if len(errs) != 0 { if len(errs) != 0 {