Update pkg/controller/cronjob/ for contextual logging

This commit is contained in:
Mengjiao Liu 2022-10-28 16:36:36 +08:00
parent d7cb1c54a5
commit 6f2cd1b5bd
6 changed files with 76 additions and 61 deletions

View File

@ -23,6 +23,8 @@ import (
"context" "context"
"fmt" "fmt"
"k8s.io/klog/v2"
"k8s.io/controller-manager/controller" "k8s.io/controller-manager/controller"
"k8s.io/kubernetes/pkg/controller/cronjob" "k8s.io/kubernetes/pkg/controller/cronjob"
"k8s.io/kubernetes/pkg/controller/job" "k8s.io/kubernetes/pkg/controller/job"
@ -38,8 +40,8 @@ func startJobController(ctx context.Context, controllerContext ControllerContext
} }
func startCronJobController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) { func startCronJobController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
ctx = klog.NewContext(ctx, klog.LoggerWithName(klog.FromContext(ctx), "cronjob-controller"))
cj2c, err := cronjob.NewControllerV2(controllerContext.InformerFactory.Batch().V1().Jobs(), cj2c, err := cronjob.NewControllerV2(ctx, controllerContext.InformerFactory.Batch().V1().Jobs(),
controllerContext.InformerFactory.Batch().V1().CronJobs(), controllerContext.InformerFactory.Batch().V1().CronJobs(),
controllerContext.ClientBuilder.ClientOrDie("cronjob-controller"), controllerContext.ClientBuilder.ClientOrDie("cronjob-controller"),
) )

View File

@ -82,7 +82,8 @@ type ControllerV2 struct {
} }
// NewControllerV2 creates and initializes a new Controller. // NewControllerV2 creates and initializes a new Controller.
func NewControllerV2(jobInformer batchv1informers.JobInformer, cronJobsInformer batchv1informers.CronJobInformer, kubeClient clientset.Interface) (*ControllerV2, error) { func NewControllerV2(ctx context.Context, jobInformer batchv1informers.JobInformer, cronJobsInformer batchv1informers.CronJobInformer, kubeClient clientset.Interface) (*ControllerV2, error) {
logger := klog.FromContext(ctx)
eventBroadcaster := record.NewBroadcaster() eventBroadcaster := record.NewBroadcaster()
jm := &ControllerV2{ jm := &ControllerV2{
@ -112,7 +113,9 @@ func NewControllerV2(jobInformer batchv1informers.JobInformer, cronJobsInformer
AddFunc: func(obj interface{}) { AddFunc: func(obj interface{}) {
jm.enqueueController(obj) jm.enqueueController(obj)
}, },
UpdateFunc: jm.updateCronJob, UpdateFunc: func(oldObj, newObj interface{}) {
jm.updateCronJob(logger, oldObj, newObj)
},
DeleteFunc: func(obj interface{}) { DeleteFunc: func(obj interface{}) {
jm.enqueueController(obj) jm.enqueueController(obj)
}, },
@ -134,8 +137,9 @@ func (jm *ControllerV2) Run(ctx context.Context, workers int) {
defer jm.queue.ShutDown() defer jm.queue.ShutDown()
klog.InfoS("Starting cronjob controller v2") logger := klog.FromContext(ctx)
defer klog.InfoS("Shutting down cronjob controller v2") logger.Info("Starting cronjob controller v2")
defer logger.Info("Shutting down cronjob controller v2")
if !cache.WaitForNamedCacheSync("cronjob", ctx.Done(), jm.jobListerSynced, jm.cronJobListerSynced) { if !cache.WaitForNamedCacheSync("cronjob", ctx.Done(), jm.jobListerSynced, jm.cronJobListerSynced) {
return return
@ -177,12 +181,12 @@ func (jm *ControllerV2) sync(ctx context.Context, cronJobKey string) (*time.Dura
if err != nil { if err != nil {
return nil, err return nil, err
} }
logger := klog.FromContext(ctx)
cronJob, err := jm.cronJobLister.CronJobs(ns).Get(name) cronJob, err := jm.cronJobLister.CronJobs(ns).Get(name)
switch { switch {
case errors.IsNotFound(err): case errors.IsNotFound(err):
// may be cronjob is deleted, don't need to requeue this key // may be cronjob is deleted, don't need to requeue this key
klog.V(4).InfoS("CronJob not found, may be it is deleted", "cronjob", klog.KRef(ns, name), "err", err) logger.V(4).Info("CronJob not found, may be it is deleted", "cronjob", klog.KObj(cronJob), "err", err)
return nil, nil return nil, nil
case err != nil: case err != nil:
// for other transient apiserver error requeue with exponential backoff // for other transient apiserver error requeue with exponential backoff
@ -196,10 +200,10 @@ func (jm *ControllerV2) sync(ctx context.Context, cronJobKey string) (*time.Dura
cronJobCopy, requeueAfter, updateStatus, err := jm.syncCronJob(ctx, cronJob, jobsToBeReconciled) cronJobCopy, requeueAfter, updateStatus, err := jm.syncCronJob(ctx, cronJob, jobsToBeReconciled)
if err != nil { if err != nil {
klog.V(2).InfoS("Error reconciling cronjob", "cronjob", klog.KRef(cronJob.GetNamespace(), cronJob.GetName()), "err", err) logger.V(2).Info("Error reconciling cronjob", "cronjob", klog.KObj(cronJob), "err", err)
if updateStatus { if updateStatus {
if _, err := jm.cronJobControl.UpdateStatus(ctx, cronJobCopy); err != nil { if _, err := jm.cronJobControl.UpdateStatus(ctx, cronJobCopy); err != nil {
klog.V(2).InfoS("Unable to update status for cronjob", "cronjob", klog.KRef(cronJob.GetNamespace(), cronJob.GetName()), "resourceVersion", cronJob.ResourceVersion, "err", err) logger.V(2).Info("Unable to update status for cronjob", "cronjob", klog.KObj(cronJob), "resourceVersion", cronJob.ResourceVersion, "err", err)
return nil, err return nil, err
} }
} }
@ -213,13 +217,13 @@ func (jm *ControllerV2) sync(ctx context.Context, cronJobKey string) (*time.Dura
// Update the CronJob if needed // Update the CronJob if needed
if updateStatus { if updateStatus {
if _, err := jm.cronJobControl.UpdateStatus(ctx, cronJobCopy); err != nil { if _, err := jm.cronJobControl.UpdateStatus(ctx, cronJobCopy); err != nil {
klog.V(2).InfoS("Unable to update status for cronjob", "cronjob", klog.KRef(cronJob.GetNamespace(), cronJob.GetName()), "resourceVersion", cronJob.ResourceVersion, "err", err) logger.V(2).Info("Unable to update status for cronjob", "cronjob", klog.KObj(cronJob), "resourceVersion", cronJob.ResourceVersion, "err", err)
return nil, err return nil, err
} }
} }
if requeueAfter != nil { if requeueAfter != nil {
klog.V(4).InfoS("Re-queuing cronjob", "cronjob", klog.KRef(cronJob.GetNamespace(), cronJob.GetName()), "requeueAfter", requeueAfter) logger.V(4).Info("Re-queuing cronjob", "cronjob", klog.KObj(cronJob), "requeueAfter", requeueAfter)
return requeueAfter, nil return requeueAfter, nil
} }
// this marks the key done, currently only happens when the cronjob is suspended or spec has invalid schedule format // this marks the key done, currently only happens when the cronjob is suspended or spec has invalid schedule format
@ -375,7 +379,7 @@ func (jm *ControllerV2) enqueueControllerAfter(obj interface{}, t time.Duration)
// updateCronJob re-queues the CronJob for next scheduled time if there is a // updateCronJob re-queues the CronJob for next scheduled time if there is a
// change in spec.schedule otherwise it re-queues it now // change in spec.schedule otherwise it re-queues it now
func (jm *ControllerV2) updateCronJob(old interface{}, curr interface{}) { func (jm *ControllerV2) updateCronJob(logger klog.Logger, old interface{}, curr interface{}) {
timeZoneEnabled := utilfeature.DefaultFeatureGate.Enabled(features.CronJobTimeZone) timeZoneEnabled := utilfeature.DefaultFeatureGate.Enabled(features.CronJobTimeZone)
oldCJ, okOld := old.(*batchv1.CronJob) oldCJ, okOld := old.(*batchv1.CronJob)
newCJ, okNew := curr.(*batchv1.CronJob) newCJ, okNew := curr.(*batchv1.CronJob)
@ -393,7 +397,7 @@ func (jm *ControllerV2) updateCronJob(old interface{}, curr interface{}) {
if err != nil { if err != nil {
// this is likely a user error in defining the spec value // this is likely a user error in defining the spec value
// we should log the error and not reconcile this cronjob until an update to spec // we should log the error and not reconcile this cronjob until an update to spec
klog.V(2).InfoS("Unparseable schedule for cronjob", "cronjob", klog.KRef(newCJ.GetNamespace(), newCJ.GetName()), "schedule", newCJ.Spec.Schedule, "err", err) logger.V(2).Info("Unparseable schedule for cronjob", "cronjob", klog.KObj(newCJ), "schedule", newCJ.Spec.Schedule, "err", err)
jm.recorder.Eventf(newCJ, corev1.EventTypeWarning, "UnParseableCronJobSchedule", "unparseable schedule for cronjob: %s", newCJ.Spec.Schedule) jm.recorder.Eventf(newCJ, corev1.EventTypeWarning, "UnParseableCronJobSchedule", "unparseable schedule for cronjob: %s", newCJ.Spec.Schedule)
return return
} }
@ -494,17 +498,18 @@ func (jm *ControllerV2) syncCronJob(
return cronJob, nil, updateStatus, nil return cronJob, nil, updateStatus, nil
} }
logger := klog.FromContext(ctx)
if timeZoneEnabled && cronJob.Spec.TimeZone != nil { if timeZoneEnabled && cronJob.Spec.TimeZone != nil {
if _, err := time.LoadLocation(*cronJob.Spec.TimeZone); err != nil { if _, err := time.LoadLocation(*cronJob.Spec.TimeZone); err != nil {
timeZone := pointer.StringDeref(cronJob.Spec.TimeZone, "") timeZone := pointer.StringDeref(cronJob.Spec.TimeZone, "")
klog.V(4).InfoS("Not starting job because timeZone is invalid", "cronjob", klog.KRef(cronJob.GetNamespace(), cronJob.GetName()), "timeZone", timeZone, "err", err) logger.V(4).Info("Not starting job because timeZone is invalid", "cronjob", klog.KObj(cronJob), "timeZone", timeZone, "err", err)
jm.recorder.Eventf(cronJob, corev1.EventTypeWarning, "UnknownTimeZone", "invalid timeZone: %q: %s", timeZone, err) jm.recorder.Eventf(cronJob, corev1.EventTypeWarning, "UnknownTimeZone", "invalid timeZone: %q: %s", timeZone, err)
return cronJob, nil, updateStatus, nil return cronJob, nil, updateStatus, nil
} }
} }
if cronJob.Spec.Suspend != nil && *cronJob.Spec.Suspend { if cronJob.Spec.Suspend != nil && *cronJob.Spec.Suspend {
klog.V(4).InfoS("Not starting job because the cron is suspended", "cronjob", klog.KRef(cronJob.GetNamespace(), cronJob.GetName())) logger.V(4).Info("Not starting job because the cron is suspended", "cronjob", klog.KObj(cronJob))
return cronJob, nil, updateStatus, nil return cronJob, nil, updateStatus, nil
} }
@ -512,16 +517,16 @@ func (jm *ControllerV2) syncCronJob(
if err != nil { if err != nil {
// this is likely a user error in defining the spec value // this is likely a user error in defining the spec value
// we should log the error and not reconcile this cronjob until an update to spec // we should log the error and not reconcile this cronjob until an update to spec
klog.V(2).InfoS("Unparseable schedule", "cronjob", klog.KRef(cronJob.GetNamespace(), cronJob.GetName()), "schedule", cronJob.Spec.Schedule, "err", err) logger.V(2).Info("Unparseable schedule", "cronjob", klog.KObj(cronJob), "schedule", cronJob.Spec.Schedule, "err", err)
jm.recorder.Eventf(cronJob, corev1.EventTypeWarning, "UnparseableSchedule", "unparseable schedule: %q : %s", cronJob.Spec.Schedule, err) jm.recorder.Eventf(cronJob, corev1.EventTypeWarning, "UnparseableSchedule", "unparseable schedule: %q : %s", cronJob.Spec.Schedule, err)
return cronJob, nil, updateStatus, nil return cronJob, nil, updateStatus, nil
} }
scheduledTime, err := nextScheduleTime(cronJob, now, sched, jm.recorder) scheduledTime, err := nextScheduleTime(logger, cronJob, now, sched, jm.recorder)
if err != nil { if err != nil {
// this is likely a user error in defining the spec value // this is likely a user error in defining the spec value
// we should log the error and not reconcile this cronjob until an update to spec // we should log the error and not reconcile this cronjob until an update to spec
klog.V(2).InfoS("invalid schedule", "cronjob", klog.KRef(cronJob.GetNamespace(), cronJob.GetName()), "schedule", cronJob.Spec.Schedule, "err", err) logger.V(2).Info("Invalid schedule", "cronjob", klog.KObj(cronJob), "schedule", cronJob.Spec.Schedule, "err", err)
jm.recorder.Eventf(cronJob, corev1.EventTypeWarning, "InvalidSchedule", "invalid schedule: %s : %s", cronJob.Spec.Schedule, err) jm.recorder.Eventf(cronJob, corev1.EventTypeWarning, "InvalidSchedule", "invalid schedule: %s : %s", cronJob.Spec.Schedule, err)
return cronJob, nil, updateStatus, nil return cronJob, nil, updateStatus, nil
} }
@ -530,7 +535,7 @@ func (jm *ControllerV2) syncCronJob(
// The only time this should happen is if queue is filled after restart. // The only time this should happen is if queue is filled after restart.
// Otherwise, the queue is always suppose to trigger sync function at the time of // Otherwise, the queue is always suppose to trigger sync function at the time of
// the scheduled time, that will give atleast 1 unmet time schedule // the scheduled time, that will give atleast 1 unmet time schedule
klog.V(4).InfoS("No unmet start times", "cronjob", klog.KRef(cronJob.GetNamespace(), cronJob.GetName())) logger.V(4).Info("No unmet start times", "cronjob", klog.KObj(cronJob))
t := nextScheduleTimeDuration(cronJob, now, sched) t := nextScheduleTimeDuration(cronJob, now, sched)
return cronJob, t, updateStatus, nil return cronJob, t, updateStatus, nil
} }
@ -540,7 +545,7 @@ func (jm *ControllerV2) syncCronJob(
tooLate = scheduledTime.Add(time.Second * time.Duration(*cronJob.Spec.StartingDeadlineSeconds)).Before(now) tooLate = scheduledTime.Add(time.Second * time.Duration(*cronJob.Spec.StartingDeadlineSeconds)).Before(now)
} }
if tooLate { if tooLate {
klog.V(4).InfoS("Missed starting window", "cronjob", klog.KRef(cronJob.GetNamespace(), cronJob.GetName())) logger.V(4).Info("Missed starting window", "cronjob", klog.KObj(cronJob))
jm.recorder.Eventf(cronJob, corev1.EventTypeWarning, "MissSchedule", "Missed scheduled time to start a job: %s", scheduledTime.UTC().Format(time.RFC1123Z)) jm.recorder.Eventf(cronJob, corev1.EventTypeWarning, "MissSchedule", "Missed scheduled time to start a job: %s", scheduledTime.UTC().Format(time.RFC1123Z))
// TODO: Since we don't set LastScheduleTime when not scheduling, we are going to keep noticing // TODO: Since we don't set LastScheduleTime when not scheduling, we are going to keep noticing
@ -558,7 +563,7 @@ func (jm *ControllerV2) syncCronJob(
Name: getJobName(cronJob, *scheduledTime), Name: getJobName(cronJob, *scheduledTime),
Namespace: cronJob.Namespace, Namespace: cronJob.Namespace,
}}) || cronJob.Status.LastScheduleTime.Equal(&metav1.Time{Time: *scheduledTime}) { }}) || cronJob.Status.LastScheduleTime.Equal(&metav1.Time{Time: *scheduledTime}) {
klog.V(4).InfoS("Not starting job because the scheduled time is already processed", "cronjob", klog.KRef(cronJob.GetNamespace(), cronJob.GetName()), "schedule", scheduledTime) logger.V(4).Info("Not starting job because the scheduled time is already processed", "cronjob", klog.KObj(cronJob), "schedule", scheduledTime)
t := nextScheduleTimeDuration(cronJob, now, sched) t := nextScheduleTimeDuration(cronJob, now, sched)
return cronJob, t, updateStatus, nil return cronJob, t, updateStatus, nil
} }
@ -572,21 +577,20 @@ func (jm *ControllerV2) syncCronJob(
// TODO: for Forbid, we could use the same name for every execution, as a lock. // TODO: for Forbid, we could use the same name for every execution, as a lock.
// With replace, we could use a name that is deterministic per execution time. // With replace, we could use a name that is deterministic per execution time.
// But that would mean that you could not inspect prior successes or failures of Forbid jobs. // But that would mean that you could not inspect prior successes or failures of Forbid jobs.
klog.V(4).InfoS("Not starting job because prior execution is still running and concurrency policy is Forbid", "cronjob", klog.KRef(cronJob.GetNamespace(), cronJob.GetName())) logger.V(4).Info("Not starting job because prior execution is still running and concurrency policy is Forbid", "cronjob", klog.KObj(cronJob))
jm.recorder.Eventf(cronJob, corev1.EventTypeNormal, "JobAlreadyActive", "Not starting job because prior execution is running and concurrency policy is Forbid") jm.recorder.Eventf(cronJob, corev1.EventTypeNormal, "JobAlreadyActive", "Not starting job because prior execution is running and concurrency policy is Forbid")
t := nextScheduleTimeDuration(cronJob, now, sched) t := nextScheduleTimeDuration(cronJob, now, sched)
return cronJob, t, updateStatus, nil return cronJob, t, updateStatus, nil
} }
if cronJob.Spec.ConcurrencyPolicy == batchv1.ReplaceConcurrent { if cronJob.Spec.ConcurrencyPolicy == batchv1.ReplaceConcurrent {
for _, j := range cronJob.Status.Active { for _, j := range cronJob.Status.Active {
klog.V(4).InfoS("Deleting job that was still running at next scheduled start time", "job", klog.KRef(j.Namespace, j.Name)) logger.V(4).Info("Deleting job that was still running at next scheduled start time", "job", klog.KRef(j.Namespace, j.Name))
job, err := jm.jobControl.GetJob(j.Namespace, j.Name) job, err := jm.jobControl.GetJob(j.Namespace, j.Name)
if err != nil { if err != nil {
jm.recorder.Eventf(cronJob, corev1.EventTypeWarning, "FailedGet", "Get job: %v", err) jm.recorder.Eventf(cronJob, corev1.EventTypeWarning, "FailedGet", "Get job: %v", err)
return cronJob, nil, updateStatus, err return cronJob, nil, updateStatus, err
} }
if !deleteJob(cronJob, job, jm.jobControl, jm.recorder) { if !deleteJob(logger, cronJob, job, jm.jobControl, jm.recorder) {
return cronJob, nil, updateStatus, fmt.Errorf("could not replace job %s/%s", job.Namespace, job.Name) return cronJob, nil, updateStatus, fmt.Errorf("could not replace job %s/%s", job.Namespace, job.Name)
} }
updateStatus = true updateStatus = true
@ -595,7 +599,7 @@ func (jm *ControllerV2) syncCronJob(
jobReq, err := getJobFromTemplate2(cronJob, *scheduledTime) jobReq, err := getJobFromTemplate2(cronJob, *scheduledTime)
if err != nil { if err != nil {
klog.ErrorS(err, "Unable to make Job from template", "cronjob", klog.KRef(cronJob.GetNamespace(), cronJob.GetName())) logger.Error(err, "Unable to make Job from template", "cronjob", klog.KObj(cronJob))
return cronJob, nil, updateStatus, err return cronJob, nil, updateStatus, err
} }
jobResp, err := jm.jobControl.CreateJob(cronJob.Namespace, jobReq) jobResp, err := jm.jobControl.CreateJob(cronJob.Namespace, jobReq)
@ -603,7 +607,7 @@ func (jm *ControllerV2) syncCronJob(
case errors.HasStatusCause(err, corev1.NamespaceTerminatingCause): case errors.HasStatusCause(err, corev1.NamespaceTerminatingCause):
case errors.IsAlreadyExists(err): case errors.IsAlreadyExists(err):
// If the job is created by other actor, assume it has updated the cronjob status accordingly // If the job is created by other actor, assume it has updated the cronjob status accordingly
klog.InfoS("Job already exists", "cronjob", klog.KRef(cronJob.GetNamespace(), cronJob.GetName()), "job", klog.KRef(jobReq.GetNamespace(), jobReq.GetName())) logger.Info("Job already exists", "cronjob", klog.KObj(cronJob), "job", klog.KObj(jobReq))
return cronJob, nil, updateStatus, err return cronJob, nil, updateStatus, err
case err != nil: case err != nil:
// default error handling // default error handling
@ -612,7 +616,7 @@ func (jm *ControllerV2) syncCronJob(
} }
metrics.CronJobCreationSkew.Observe(jobResp.ObjectMeta.GetCreationTimestamp().Sub(*scheduledTime).Seconds()) metrics.CronJobCreationSkew.Observe(jobResp.ObjectMeta.GetCreationTimestamp().Sub(*scheduledTime).Seconds())
klog.V(4).InfoS("Created Job", "job", klog.KRef(jobResp.GetNamespace(), jobResp.GetName()), "cronjob", klog.KRef(cronJob.GetNamespace(), cronJob.GetName())) logger.V(4).Info("Created Job", "job", klog.KObj(jobResp), "cronjob", klog.KObj(cronJob))
jm.recorder.Eventf(cronJob, corev1.EventTypeNormal, "SuccessfulCreate", "Created job %v", jobResp.Name) jm.recorder.Eventf(cronJob, corev1.EventTypeNormal, "SuccessfulCreate", "Created job %v", jobResp.Name)
// ------------------------------------------------------------------ // // ------------------------------------------------------------------ //
@ -628,8 +632,8 @@ func (jm *ControllerV2) syncCronJob(
// Add the just-started job to the status list. // Add the just-started job to the status list.
jobRef, err := getRef(jobResp) jobRef, err := getRef(jobResp)
if err != nil { if err != nil {
klog.V(2).InfoS("Unable to make object reference", "cronjob", klog.KRef(cronJob.GetNamespace(), cronJob.GetName()), "err", err) logger.V(2).Info("Unable to make object reference", "cronjob", klog.KObj(cronJob), "err", err)
return cronJob, nil, updateStatus, fmt.Errorf("unable to make object reference for job for %s", klog.KRef(cronJob.GetNamespace(), cronJob.GetName())) return cronJob, nil, updateStatus, fmt.Errorf("unable to make object reference for job for %s", klog.KObj(cronJob))
} }
cronJob.Status.Active = append(cronJob.Status.Active, *jobRef) cronJob.Status.Active = append(cronJob.Status.Active, *jobRef)
cronJob.Status.LastScheduleTime = &metav1.Time{Time: *scheduledTime} cronJob.Status.LastScheduleTime = &metav1.Time{Time: *scheduledTime}
@ -665,14 +669,14 @@ func (jm *ControllerV2) cleanupFinishedJobs(ctx context.Context, cj *batchv1.Cro
} }
if cj.Spec.SuccessfulJobsHistoryLimit != nil && if cj.Spec.SuccessfulJobsHistoryLimit != nil &&
jm.removeOldestJobs(cj, jm.removeOldestJobs(ctx, cj,
successfulJobs, successfulJobs,
*cj.Spec.SuccessfulJobsHistoryLimit) { *cj.Spec.SuccessfulJobsHistoryLimit) {
updateStatus = true updateStatus = true
} }
if cj.Spec.FailedJobsHistoryLimit != nil && if cj.Spec.FailedJobsHistoryLimit != nil &&
jm.removeOldestJobs(cj, jm.removeOldestJobs(ctx, cj,
failedJobs, failedJobs,
*cj.Spec.FailedJobsHistoryLimit) { *cj.Spec.FailedJobsHistoryLimit) {
updateStatus = true updateStatus = true
@ -691,19 +695,19 @@ func (jm *ControllerV2) getFinishedStatus(j *batchv1.Job) (bool, batchv1.JobCond
} }
// removeOldestJobs removes the oldest jobs from a list of jobs // removeOldestJobs removes the oldest jobs from a list of jobs
func (jm *ControllerV2) removeOldestJobs(cj *batchv1.CronJob, js []*batchv1.Job, maxJobs int32) bool { func (jm *ControllerV2) removeOldestJobs(ctx context.Context, cj *batchv1.CronJob, js []*batchv1.Job, maxJobs int32) bool {
updateStatus := false updateStatus := false
numToDelete := len(js) - int(maxJobs) numToDelete := len(js) - int(maxJobs)
if numToDelete <= 0 { if numToDelete <= 0 {
return updateStatus return updateStatus
} }
logger := klog.FromContext(ctx)
klog.V(4).InfoS("Cleaning up jobs from CronJob list", "deletejobnum", numToDelete, "jobnum", len(js), "cronjob", klog.KRef(cj.GetNamespace(), cj.GetName())) logger.V(4).Info("Cleaning up jobs from CronJob list", "deletejobnum", numToDelete, "jobnum", len(js), "cronjob", klog.KObj(cj))
sort.Sort(byJobStartTime(js)) sort.Sort(byJobStartTime(js))
for i := 0; i < numToDelete; i++ { for i := 0; i < numToDelete; i++ {
klog.V(4).InfoS("Removing job from CronJob list", "job", js[i].Name, "cronjob", klog.KRef(cj.GetNamespace(), cj.GetName())) logger.V(4).Info("Removing job from CronJob list", "job", js[i].Name, "cronjob", klog.KObj(cj))
if deleteJob(cj, js[i], jm.jobControl, jm.recorder) { if deleteJob(logger, cj, js[i], jm.jobControl, jm.recorder) {
updateStatus = true updateStatus = true
} }
} }
@ -711,13 +715,11 @@ func (jm *ControllerV2) removeOldestJobs(cj *batchv1.CronJob, js []*batchv1.Job,
} }
// deleteJob reaps a job, deleting the job, the pods and the reference in the active list // deleteJob reaps a job, deleting the job, the pods and the reference in the active list
func deleteJob(cj *batchv1.CronJob, job *batchv1.Job, jc jobControlInterface, recorder record.EventRecorder) bool { func deleteJob(logger klog.Logger, cj *batchv1.CronJob, job *batchv1.Job, jc jobControlInterface, recorder record.EventRecorder) bool {
nameForLog := fmt.Sprintf("%s/%s", cj.Namespace, cj.Name)
// delete the job itself... // delete the job itself...
if err := jc.DeleteJob(job.Namespace, job.Name); err != nil { if err := jc.DeleteJob(job.Namespace, job.Name); err != nil {
recorder.Eventf(cj, corev1.EventTypeWarning, "FailedDelete", "Deleted job: %v", err) recorder.Eventf(cj, corev1.EventTypeWarning, "FailedDelete", "Deleted job: %v", err)
klog.Errorf("Error deleting job %s from %s: %v", job.Name, nameForLog, err) logger.Error(err, "Error deleting job from cronjob", "job", klog.KObj(job), "cronjob", klog.KObj(cj))
return false return false
} }
// ... and its reference from active list // ... and its reference from active list

View File

@ -36,6 +36,7 @@ import (
"k8s.io/client-go/tools/record" "k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue" "k8s.io/client-go/util/workqueue"
featuregatetesting "k8s.io/component-base/featuregate/testing" featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/klog/v2/ktesting"
_ "k8s.io/kubernetes/pkg/apis/batch/install" _ "k8s.io/kubernetes/pkg/apis/batch/install"
_ "k8s.io/kubernetes/pkg/apis/core/install" _ "k8s.io/kubernetes/pkg/apis/core/install"
"k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller"
@ -1555,9 +1556,12 @@ func TestControllerV2UpdateCronJob(t *testing.T) {
} }
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
logger, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
kubeClient := fake.NewSimpleClientset() kubeClient := fake.NewSimpleClientset()
sharedInformers := informers.NewSharedInformerFactory(kubeClient, controller.NoResyncPeriodFunc()) sharedInformers := informers.NewSharedInformerFactory(kubeClient, controller.NoResyncPeriodFunc())
jm, err := NewControllerV2(sharedInformers.Batch().V1().Jobs(), sharedInformers.Batch().V1().CronJobs(), kubeClient) jm, err := NewControllerV2(ctx, sharedInformers.Batch().V1().Jobs(), sharedInformers.Batch().V1().CronJobs(), kubeClient)
if err != nil { if err != nil {
t.Errorf("unexpected error %v", err) t.Errorf("unexpected error %v", err)
return return
@ -1569,7 +1573,7 @@ func TestControllerV2UpdateCronJob(t *testing.T) {
jm.cronJobControl = &fakeCJControl{} jm.cronJobControl = &fakeCJControl{}
jm.recorder = record.NewFakeRecorder(10) jm.recorder = record.NewFakeRecorder(10)
jm.updateCronJob(tt.oldCronJob, tt.newCronJob) jm.updateCronJob(logger, tt.oldCronJob, tt.newCronJob)
if queue.delay.Seconds() != tt.expectedDelay.Seconds() { if queue.delay.Seconds() != tt.expectedDelay.Seconds() {
t.Errorf("Expected delay %#v got %#v", tt.expectedDelay.Seconds(), queue.delay.Seconds()) t.Errorf("Expected delay %#v got %#v", tt.expectedDelay.Seconds(), queue.delay.Seconds())
} }
@ -1652,12 +1656,15 @@ func TestControllerV2GetJobsToBeReconciled(t *testing.T) {
} }
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
kubeClient := fake.NewSimpleClientset() kubeClient := fake.NewSimpleClientset()
sharedInformers := informers.NewSharedInformerFactory(kubeClient, controller.NoResyncPeriodFunc()) sharedInformers := informers.NewSharedInformerFactory(kubeClient, controller.NoResyncPeriodFunc())
for _, job := range tt.jobs { for _, job := range tt.jobs {
sharedInformers.Batch().V1().Jobs().Informer().GetIndexer().Add(job) sharedInformers.Batch().V1().Jobs().Informer().GetIndexer().Add(job)
} }
jm, err := NewControllerV2(sharedInformers.Batch().V1().Jobs(), sharedInformers.Batch().V1().CronJobs(), kubeClient) jm, err := NewControllerV2(ctx, sharedInformers.Batch().V1().Jobs(), sharedInformers.Batch().V1().CronJobs(), kubeClient)
if err != nil { if err != nil {
t.Errorf("unexpected error %v", err) t.Errorf("unexpected error %v", err)
return return

View File

@ -135,7 +135,7 @@ func nextScheduleTimeDuration(cj *batchv1.CronJob, now time.Time, schedule cron.
// nextScheduleTime returns the time.Time of the next schedule after the last scheduled // nextScheduleTime returns the time.Time of the next schedule after the last scheduled
// and before now, or nil if no unmet schedule times, and an error. // and before now, or nil if no unmet schedule times, and an error.
// If there are too many (>100) unstarted times, it will also record a warning. // If there are too many (>100) unstarted times, it will also record a warning.
func nextScheduleTime(cj *batchv1.CronJob, now time.Time, schedule cron.Schedule, recorder record.EventRecorder) (*time.Time, error) { func nextScheduleTime(logger klog.Logger, cj *batchv1.CronJob, now time.Time, schedule cron.Schedule, recorder record.EventRecorder) (*time.Time, error) {
_, mostRecentTime, numberOfMissedSchedules, err := mostRecentScheduleTime(cj, now, schedule, true) _, mostRecentTime, numberOfMissedSchedules, err := mostRecentScheduleTime(cj, now, schedule, true)
if mostRecentTime == nil || mostRecentTime.After(now) { if mostRecentTime == nil || mostRecentTime.After(now) {
@ -161,7 +161,7 @@ func nextScheduleTime(cj *batchv1.CronJob, now time.Time, schedule cron.Schedule
// I've somewhat arbitrarily picked 100, as more than 80, // I've somewhat arbitrarily picked 100, as more than 80,
// but less than "lots". // but less than "lots".
recorder.Eventf(cj, corev1.EventTypeWarning, "TooManyMissedTimes", "too many missed start times: %d. Set or decrease .spec.startingDeadlineSeconds or check clock skew", numberOfMissedSchedules) recorder.Eventf(cj, corev1.EventTypeWarning, "TooManyMissedTimes", "too many missed start times: %d. Set or decrease .spec.startingDeadlineSeconds or check clock skew", numberOfMissedSchedules)
klog.InfoS("too many missed times", "cronjob", klog.KRef(cj.GetNamespace(), cj.GetName()), "missedTimes", numberOfMissedSchedules) logger.Info("too many missed times", "cronjob", klog.KObj(cj), "missedTimes", numberOfMissedSchedules)
} }
return mostRecentTime, err return mostRecentTime, err
} }

View File

@ -29,6 +29,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record" "k8s.io/client-go/tools/record"
"k8s.io/klog/v2/ktesting"
) )
func TestGetJobFromTemplate2(t *testing.T) { func TestGetJobFromTemplate2(t *testing.T) {
@ -89,6 +90,7 @@ func TestGetJobFromTemplate2(t *testing.T) {
} }
func TestNextScheduleTime(t *testing.T) { func TestNextScheduleTime(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
// schedule is hourly on the hour // schedule is hourly on the hour
schedule := "0 * * * ?" schedule := "0 * * * ?"
@ -124,7 +126,7 @@ func TestNextScheduleTime(t *testing.T) {
cj.ObjectMeta.CreationTimestamp = metav1.Time{Time: T1.Add(-10 * time.Minute)} cj.ObjectMeta.CreationTimestamp = metav1.Time{Time: T1.Add(-10 * time.Minute)}
// Current time is more than creation time, but less than T1. // Current time is more than creation time, but less than T1.
now := T1.Add(-7 * time.Minute) now := T1.Add(-7 * time.Minute)
schedule, _ := nextScheduleTime(&cj, now, PraseSchedule(cj.Spec.Schedule), recorder) schedule, _ := nextScheduleTime(logger, &cj, now, PraseSchedule(cj.Spec.Schedule), recorder)
if schedule != nil { if schedule != nil {
t.Errorf("expected no start time, got: %v", schedule) t.Errorf("expected no start time, got: %v", schedule)
} }
@ -135,7 +137,7 @@ func TestNextScheduleTime(t *testing.T) {
cj.ObjectMeta.CreationTimestamp = metav1.Time{Time: T1.Add(-10 * time.Minute)} cj.ObjectMeta.CreationTimestamp = metav1.Time{Time: T1.Add(-10 * time.Minute)}
// Current time is after T1 // Current time is after T1
now := T1.Add(2 * time.Second) now := T1.Add(2 * time.Second)
schedule, _ := nextScheduleTime(&cj, now, PraseSchedule(cj.Spec.Schedule), recorder) schedule, _ := nextScheduleTime(logger, &cj, now, PraseSchedule(cj.Spec.Schedule), recorder)
if schedule == nil { if schedule == nil {
t.Errorf("expected 1 start time, got nil") t.Errorf("expected 1 start time, got nil")
} else if !schedule.Equal(T1) { } else if !schedule.Equal(T1) {
@ -150,7 +152,7 @@ func TestNextScheduleTime(t *testing.T) {
cj.Status.LastScheduleTime = &metav1.Time{Time: T1} cj.Status.LastScheduleTime = &metav1.Time{Time: T1}
// Current time is after T1 // Current time is after T1
now := T1.Add(2 * time.Minute) now := T1.Add(2 * time.Minute)
schedule, _ := nextScheduleTime(&cj, now, PraseSchedule(cj.Spec.Schedule), recorder) schedule, _ := nextScheduleTime(logger, &cj, now, PraseSchedule(cj.Spec.Schedule), recorder)
if schedule != nil { if schedule != nil {
t.Errorf("expected 0 start times, got: %v", schedule) t.Errorf("expected 0 start times, got: %v", schedule)
} }
@ -163,7 +165,7 @@ func TestNextScheduleTime(t *testing.T) {
cj.Status.LastScheduleTime = &metav1.Time{Time: T1} cj.Status.LastScheduleTime = &metav1.Time{Time: T1}
// Current time is after T1 and after T2 // Current time is after T1 and after T2
now := T2.Add(5 * time.Minute) now := T2.Add(5 * time.Minute)
schedule, _ := nextScheduleTime(&cj, now, PraseSchedule(cj.Spec.Schedule), recorder) schedule, _ := nextScheduleTime(logger, &cj, now, PraseSchedule(cj.Spec.Schedule), recorder)
if schedule == nil { if schedule == nil {
t.Errorf("expected 1 start times, got nil") t.Errorf("expected 1 start times, got nil")
} else if !schedule.Equal(T2) { } else if !schedule.Equal(T2) {
@ -176,7 +178,7 @@ func TestNextScheduleTime(t *testing.T) {
cj.Status.LastScheduleTime = &metav1.Time{Time: T1.Add(-1 * time.Hour)} cj.Status.LastScheduleTime = &metav1.Time{Time: T1.Add(-1 * time.Hour)}
// Current time is after T1 and after T2 // Current time is after T1 and after T2
now := T2.Add(5 * time.Minute) now := T2.Add(5 * time.Minute)
schedule, _ := nextScheduleTime(&cj, now, PraseSchedule(cj.Spec.Schedule), recorder) schedule, _ := nextScheduleTime(logger, &cj, now, PraseSchedule(cj.Spec.Schedule), recorder)
if schedule == nil { if schedule == nil {
t.Errorf("expected 1 start times, got nil") t.Errorf("expected 1 start times, got nil")
} else if !schedule.Equal(T2) { } else if !schedule.Equal(T2) {
@ -188,7 +190,7 @@ func TestNextScheduleTime(t *testing.T) {
cj.ObjectMeta.CreationTimestamp = metav1.Time{Time: T1.Add(-2 * time.Hour)} cj.ObjectMeta.CreationTimestamp = metav1.Time{Time: T1.Add(-2 * time.Hour)}
cj.Status.LastScheduleTime = &metav1.Time{Time: T1.Add(-1 * time.Hour)} cj.Status.LastScheduleTime = &metav1.Time{Time: T1.Add(-1 * time.Hour)}
now := T2.Add(10 * 24 * time.Hour) now := T2.Add(10 * 24 * time.Hour)
schedule, _ := nextScheduleTime(&cj, now, PraseSchedule(cj.Spec.Schedule), recorder) schedule, _ := nextScheduleTime(logger, &cj, now, PraseSchedule(cj.Spec.Schedule), recorder)
if schedule == nil { if schedule == nil {
t.Errorf("expected more than 0 missed times") t.Errorf("expected more than 0 missed times")
} }
@ -201,7 +203,7 @@ func TestNextScheduleTime(t *testing.T) {
// Deadline is short // Deadline is short
deadline := int64(2 * 60 * 60) deadline := int64(2 * 60 * 60)
cj.Spec.StartingDeadlineSeconds = &deadline cj.Spec.StartingDeadlineSeconds = &deadline
schedule, _ := nextScheduleTime(&cj, now, PraseSchedule(cj.Spec.Schedule), recorder) schedule, _ := nextScheduleTime(logger, &cj, now, PraseSchedule(cj.Spec.Schedule), recorder)
if schedule == nil { if schedule == nil {
t.Errorf("expected more than 0 missed times") t.Errorf("expected more than 0 missed times")
} }
@ -212,7 +214,7 @@ func TestNextScheduleTime(t *testing.T) {
cj.Status.LastScheduleTime = nil cj.Status.LastScheduleTime = nil
now := *deltaTimeAfterTopOfTheHour(1 * time.Hour) now := *deltaTimeAfterTopOfTheHour(1 * time.Hour)
// rouge schedule // rouge schedule
schedule, err := nextScheduleTime(&cj, now, PraseSchedule("59 23 31 2 *"), recorder) schedule, err := nextScheduleTime(logger, &cj, now, PraseSchedule("59 23 31 2 *"), recorder)
if schedule != nil { if schedule != nil {
t.Errorf("expected no start time, got: %v", schedule) t.Errorf("expected no start time, got: %v", schedule)
} }

View File

@ -30,13 +30,14 @@ import (
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
clientbatchv1 "k8s.io/client-go/kubernetes/typed/batch/v1" clientbatchv1 "k8s.io/client-go/kubernetes/typed/batch/v1"
restclient "k8s.io/client-go/rest" restclient "k8s.io/client-go/rest"
"k8s.io/klog/v2/ktesting"
kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing" kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
"k8s.io/kubernetes/pkg/controller/cronjob" "k8s.io/kubernetes/pkg/controller/cronjob"
"k8s.io/kubernetes/pkg/controller/job" "k8s.io/kubernetes/pkg/controller/job"
"k8s.io/kubernetes/test/integration/framework" "k8s.io/kubernetes/test/integration/framework"
) )
func setup(t *testing.T) (kubeapiservertesting.TearDownFunc, *cronjob.ControllerV2, *job.Controller, informers.SharedInformerFactory, clientset.Interface) { func setup(ctx context.Context, t *testing.T) (kubeapiservertesting.TearDownFunc, *cronjob.ControllerV2, *job.Controller, informers.SharedInformerFactory, clientset.Interface) {
// Disable ServiceAccount admission plugin as we don't have serviceaccount controller running. // Disable ServiceAccount admission plugin as we don't have serviceaccount controller running.
server := kubeapiservertesting.StartTestServerOrDie(t, nil, []string{"--disable-admission-plugins=ServiceAccount"}, framework.SharedEtcd()) server := kubeapiservertesting.StartTestServerOrDie(t, nil, []string{"--disable-admission-plugins=ServiceAccount"}, framework.SharedEtcd())
@ -47,7 +48,7 @@ func setup(t *testing.T) (kubeapiservertesting.TearDownFunc, *cronjob.Controller
} }
resyncPeriod := 12 * time.Hour resyncPeriod := 12 * time.Hour
informerSet := informers.NewSharedInformerFactory(clientset.NewForConfigOrDie(restclient.AddUserAgent(config, "cronjob-informers")), resyncPeriod) informerSet := informers.NewSharedInformerFactory(clientset.NewForConfigOrDie(restclient.AddUserAgent(config, "cronjob-informers")), resyncPeriod)
cjc, err := cronjob.NewControllerV2(informerSet.Batch().V1().Jobs(), informerSet.Batch().V1().CronJobs(), clientSet) cjc, err := cronjob.NewControllerV2(ctx, informerSet.Batch().V1().Jobs(), informerSet.Batch().V1().CronJobs(), clientSet)
if err != nil { if err != nil {
t.Fatalf("Error creating CronJob controller: %v", err) t.Fatalf("Error creating CronJob controller: %v", err)
} }
@ -144,7 +145,11 @@ func validateJobAndPod(t *testing.T, clientSet clientset.Interface, namespace st
} }
func TestCronJobLaunchesPodAndCleansUp(t *testing.T) { func TestCronJobLaunchesPodAndCleansUp(t *testing.T) {
closeFn, cjc, jc, informerSet, clientSet := setup(t) _, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
closeFn, cjc, jc, informerSet, clientSet := setup(ctx, t)
defer closeFn() defer closeFn()
cronJobName := "foo" cronJobName := "foo"
@ -155,9 +160,6 @@ func TestCronJobLaunchesPodAndCleansUp(t *testing.T) {
cjClient := clientSet.BatchV1().CronJobs(ns.Name) cjClient := clientSet.BatchV1().CronJobs(ns.Name)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
informerSet.Start(ctx.Done()) informerSet.Start(ctx.Done())
go cjc.Run(ctx, 1) go cjc.Run(ctx, 1)
go jc.Run(ctx, 1) go jc.Run(ctx, 1)