diff --git a/cmd/kube-controller-manager/app/batch.go b/cmd/kube-controller-manager/app/batch.go index e51eb8fdd6a..445e2243979 100644 --- a/cmd/kube-controller-manager/app/batch.go +++ b/cmd/kube-controller-manager/app/batch.go @@ -23,6 +23,8 @@ import ( "context" "fmt" + "k8s.io/klog/v2" + "k8s.io/controller-manager/controller" "k8s.io/kubernetes/pkg/controller/cronjob" "k8s.io/kubernetes/pkg/controller/job" @@ -38,8 +40,8 @@ func startJobController(ctx context.Context, controllerContext ControllerContext } func startCronJobController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) { - - cj2c, err := cronjob.NewControllerV2(controllerContext.InformerFactory.Batch().V1().Jobs(), + ctx = klog.NewContext(ctx, klog.LoggerWithName(klog.FromContext(ctx), "cronjob-controller")) + cj2c, err := cronjob.NewControllerV2(ctx, controllerContext.InformerFactory.Batch().V1().Jobs(), controllerContext.InformerFactory.Batch().V1().CronJobs(), controllerContext.ClientBuilder.ClientOrDie("cronjob-controller"), ) diff --git a/pkg/controller/cronjob/cronjob_controllerv2.go b/pkg/controller/cronjob/cronjob_controllerv2.go index cb706bbae45..43ebb1a3201 100644 --- a/pkg/controller/cronjob/cronjob_controllerv2.go +++ b/pkg/controller/cronjob/cronjob_controllerv2.go @@ -82,7 +82,8 @@ type ControllerV2 struct { } // NewControllerV2 creates and initializes a new Controller. -func NewControllerV2(jobInformer batchv1informers.JobInformer, cronJobsInformer batchv1informers.CronJobInformer, kubeClient clientset.Interface) (*ControllerV2, error) { +func NewControllerV2(ctx context.Context, jobInformer batchv1informers.JobInformer, cronJobsInformer batchv1informers.CronJobInformer, kubeClient clientset.Interface) (*ControllerV2, error) { + logger := klog.FromContext(ctx) eventBroadcaster := record.NewBroadcaster() jm := &ControllerV2{ @@ -112,7 +113,9 @@ func NewControllerV2(jobInformer batchv1informers.JobInformer, cronJobsInformer AddFunc: func(obj interface{}) { jm.enqueueController(obj) }, - UpdateFunc: jm.updateCronJob, + UpdateFunc: func(oldObj, newObj interface{}) { + jm.updateCronJob(logger, oldObj, newObj) + }, DeleteFunc: func(obj interface{}) { jm.enqueueController(obj) }, @@ -134,8 +137,9 @@ func (jm *ControllerV2) Run(ctx context.Context, workers int) { defer jm.queue.ShutDown() - klog.InfoS("Starting cronjob controller v2") - defer klog.InfoS("Shutting down cronjob controller v2") + logger := klog.FromContext(ctx) + logger.Info("Starting cronjob controller v2") + defer logger.Info("Shutting down cronjob controller v2") if !cache.WaitForNamedCacheSync("cronjob", ctx.Done(), jm.jobListerSynced, jm.cronJobListerSynced) { return @@ -177,12 +181,12 @@ func (jm *ControllerV2) sync(ctx context.Context, cronJobKey string) (*time.Dura if err != nil { return nil, err } - + logger := klog.FromContext(ctx) cronJob, err := jm.cronJobLister.CronJobs(ns).Get(name) switch { case errors.IsNotFound(err): // may be cronjob is deleted, don't need to requeue this key - klog.V(4).InfoS("CronJob not found, may be it is deleted", "cronjob", klog.KRef(ns, name), "err", err) + logger.V(4).Info("CronJob not found, may be it is deleted", "cronjob", klog.KObj(cronJob), "err", err) return nil, nil case err != nil: // for other transient apiserver error requeue with exponential backoff @@ -196,10 +200,10 @@ func (jm *ControllerV2) sync(ctx context.Context, cronJobKey string) (*time.Dura cronJobCopy, requeueAfter, updateStatus, err := jm.syncCronJob(ctx, cronJob, jobsToBeReconciled) if err != nil { - klog.V(2).InfoS("Error reconciling cronjob", "cronjob", klog.KRef(cronJob.GetNamespace(), cronJob.GetName()), "err", err) + logger.V(2).Info("Error reconciling cronjob", "cronjob", klog.KObj(cronJob), "err", err) if updateStatus { if _, err := jm.cronJobControl.UpdateStatus(ctx, cronJobCopy); err != nil { - klog.V(2).InfoS("Unable to update status for cronjob", "cronjob", klog.KRef(cronJob.GetNamespace(), cronJob.GetName()), "resourceVersion", cronJob.ResourceVersion, "err", err) + logger.V(2).Info("Unable to update status for cronjob", "cronjob", klog.KObj(cronJob), "resourceVersion", cronJob.ResourceVersion, "err", err) return nil, err } } @@ -213,13 +217,13 @@ func (jm *ControllerV2) sync(ctx context.Context, cronJobKey string) (*time.Dura // Update the CronJob if needed if updateStatus { if _, err := jm.cronJobControl.UpdateStatus(ctx, cronJobCopy); err != nil { - klog.V(2).InfoS("Unable to update status for cronjob", "cronjob", klog.KRef(cronJob.GetNamespace(), cronJob.GetName()), "resourceVersion", cronJob.ResourceVersion, "err", err) + logger.V(2).Info("Unable to update status for cronjob", "cronjob", klog.KObj(cronJob), "resourceVersion", cronJob.ResourceVersion, "err", err) return nil, err } } if requeueAfter != nil { - klog.V(4).InfoS("Re-queuing cronjob", "cronjob", klog.KRef(cronJob.GetNamespace(), cronJob.GetName()), "requeueAfter", requeueAfter) + logger.V(4).Info("Re-queuing cronjob", "cronjob", klog.KObj(cronJob), "requeueAfter", requeueAfter) return requeueAfter, nil } // this marks the key done, currently only happens when the cronjob is suspended or spec has invalid schedule format @@ -375,7 +379,7 @@ func (jm *ControllerV2) enqueueControllerAfter(obj interface{}, t time.Duration) // updateCronJob re-queues the CronJob for next scheduled time if there is a // change in spec.schedule otherwise it re-queues it now -func (jm *ControllerV2) updateCronJob(old interface{}, curr interface{}) { +func (jm *ControllerV2) updateCronJob(logger klog.Logger, old interface{}, curr interface{}) { timeZoneEnabled := utilfeature.DefaultFeatureGate.Enabled(features.CronJobTimeZone) oldCJ, okOld := old.(*batchv1.CronJob) newCJ, okNew := curr.(*batchv1.CronJob) @@ -393,7 +397,7 @@ func (jm *ControllerV2) updateCronJob(old interface{}, curr interface{}) { if err != nil { // this is likely a user error in defining the spec value // we should log the error and not reconcile this cronjob until an update to spec - klog.V(2).InfoS("Unparseable schedule for cronjob", "cronjob", klog.KRef(newCJ.GetNamespace(), newCJ.GetName()), "schedule", newCJ.Spec.Schedule, "err", err) + logger.V(2).Info("Unparseable schedule for cronjob", "cronjob", klog.KObj(newCJ), "schedule", newCJ.Spec.Schedule, "err", err) jm.recorder.Eventf(newCJ, corev1.EventTypeWarning, "UnParseableCronJobSchedule", "unparseable schedule for cronjob: %s", newCJ.Spec.Schedule) return } @@ -494,17 +498,18 @@ func (jm *ControllerV2) syncCronJob( return cronJob, nil, updateStatus, nil } + logger := klog.FromContext(ctx) if timeZoneEnabled && cronJob.Spec.TimeZone != nil { if _, err := time.LoadLocation(*cronJob.Spec.TimeZone); err != nil { timeZone := pointer.StringDeref(cronJob.Spec.TimeZone, "") - klog.V(4).InfoS("Not starting job because timeZone is invalid", "cronjob", klog.KRef(cronJob.GetNamespace(), cronJob.GetName()), "timeZone", timeZone, "err", err) + logger.V(4).Info("Not starting job because timeZone is invalid", "cronjob", klog.KObj(cronJob), "timeZone", timeZone, "err", err) jm.recorder.Eventf(cronJob, corev1.EventTypeWarning, "UnknownTimeZone", "invalid timeZone: %q: %s", timeZone, err) return cronJob, nil, updateStatus, nil } } if cronJob.Spec.Suspend != nil && *cronJob.Spec.Suspend { - klog.V(4).InfoS("Not starting job because the cron is suspended", "cronjob", klog.KRef(cronJob.GetNamespace(), cronJob.GetName())) + logger.V(4).Info("Not starting job because the cron is suspended", "cronjob", klog.KObj(cronJob)) return cronJob, nil, updateStatus, nil } @@ -512,16 +517,16 @@ func (jm *ControllerV2) syncCronJob( if err != nil { // this is likely a user error in defining the spec value // we should log the error and not reconcile this cronjob until an update to spec - klog.V(2).InfoS("Unparseable schedule", "cronjob", klog.KRef(cronJob.GetNamespace(), cronJob.GetName()), "schedule", cronJob.Spec.Schedule, "err", err) + logger.V(2).Info("Unparseable schedule", "cronjob", klog.KObj(cronJob), "schedule", cronJob.Spec.Schedule, "err", err) jm.recorder.Eventf(cronJob, corev1.EventTypeWarning, "UnparseableSchedule", "unparseable schedule: %q : %s", cronJob.Spec.Schedule, err) return cronJob, nil, updateStatus, nil } - scheduledTime, err := nextScheduleTime(cronJob, now, sched, jm.recorder) + scheduledTime, err := nextScheduleTime(logger, cronJob, now, sched, jm.recorder) if err != nil { // this is likely a user error in defining the spec value // we should log the error and not reconcile this cronjob until an update to spec - klog.V(2).InfoS("invalid schedule", "cronjob", klog.KRef(cronJob.GetNamespace(), cronJob.GetName()), "schedule", cronJob.Spec.Schedule, "err", err) + logger.V(2).Info("Invalid schedule", "cronjob", klog.KObj(cronJob), "schedule", cronJob.Spec.Schedule, "err", err) jm.recorder.Eventf(cronJob, corev1.EventTypeWarning, "InvalidSchedule", "invalid schedule: %s : %s", cronJob.Spec.Schedule, err) return cronJob, nil, updateStatus, nil } @@ -530,7 +535,7 @@ func (jm *ControllerV2) syncCronJob( // The only time this should happen is if queue is filled after restart. // Otherwise, the queue is always suppose to trigger sync function at the time of // the scheduled time, that will give atleast 1 unmet time schedule - klog.V(4).InfoS("No unmet start times", "cronjob", klog.KRef(cronJob.GetNamespace(), cronJob.GetName())) + logger.V(4).Info("No unmet start times", "cronjob", klog.KObj(cronJob)) t := nextScheduleTimeDuration(cronJob, now, sched) return cronJob, t, updateStatus, nil } @@ -540,7 +545,7 @@ func (jm *ControllerV2) syncCronJob( tooLate = scheduledTime.Add(time.Second * time.Duration(*cronJob.Spec.StartingDeadlineSeconds)).Before(now) } if tooLate { - klog.V(4).InfoS("Missed starting window", "cronjob", klog.KRef(cronJob.GetNamespace(), cronJob.GetName())) + logger.V(4).Info("Missed starting window", "cronjob", klog.KObj(cronJob)) jm.recorder.Eventf(cronJob, corev1.EventTypeWarning, "MissSchedule", "Missed scheduled time to start a job: %s", scheduledTime.UTC().Format(time.RFC1123Z)) // TODO: Since we don't set LastScheduleTime when not scheduling, we are going to keep noticing @@ -558,7 +563,7 @@ func (jm *ControllerV2) syncCronJob( Name: getJobName(cronJob, *scheduledTime), Namespace: cronJob.Namespace, }}) || cronJob.Status.LastScheduleTime.Equal(&metav1.Time{Time: *scheduledTime}) { - klog.V(4).InfoS("Not starting job because the scheduled time is already processed", "cronjob", klog.KRef(cronJob.GetNamespace(), cronJob.GetName()), "schedule", scheduledTime) + logger.V(4).Info("Not starting job because the scheduled time is already processed", "cronjob", klog.KObj(cronJob), "schedule", scheduledTime) t := nextScheduleTimeDuration(cronJob, now, sched) return cronJob, t, updateStatus, nil } @@ -572,21 +577,20 @@ func (jm *ControllerV2) syncCronJob( // TODO: for Forbid, we could use the same name for every execution, as a lock. // With replace, we could use a name that is deterministic per execution time. // But that would mean that you could not inspect prior successes or failures of Forbid jobs. - klog.V(4).InfoS("Not starting job because prior execution is still running and concurrency policy is Forbid", "cronjob", klog.KRef(cronJob.GetNamespace(), cronJob.GetName())) + logger.V(4).Info("Not starting job because prior execution is still running and concurrency policy is Forbid", "cronjob", klog.KObj(cronJob)) jm.recorder.Eventf(cronJob, corev1.EventTypeNormal, "JobAlreadyActive", "Not starting job because prior execution is running and concurrency policy is Forbid") t := nextScheduleTimeDuration(cronJob, now, sched) return cronJob, t, updateStatus, nil } if cronJob.Spec.ConcurrencyPolicy == batchv1.ReplaceConcurrent { for _, j := range cronJob.Status.Active { - klog.V(4).InfoS("Deleting job that was still running at next scheduled start time", "job", klog.KRef(j.Namespace, j.Name)) - + logger.V(4).Info("Deleting job that was still running at next scheduled start time", "job", klog.KRef(j.Namespace, j.Name)) job, err := jm.jobControl.GetJob(j.Namespace, j.Name) if err != nil { jm.recorder.Eventf(cronJob, corev1.EventTypeWarning, "FailedGet", "Get job: %v", err) return cronJob, nil, updateStatus, err } - if !deleteJob(cronJob, job, jm.jobControl, jm.recorder) { + if !deleteJob(logger, cronJob, job, jm.jobControl, jm.recorder) { return cronJob, nil, updateStatus, fmt.Errorf("could not replace job %s/%s", job.Namespace, job.Name) } updateStatus = true @@ -595,7 +599,7 @@ func (jm *ControllerV2) syncCronJob( jobReq, err := getJobFromTemplate2(cronJob, *scheduledTime) if err != nil { - klog.ErrorS(err, "Unable to make Job from template", "cronjob", klog.KRef(cronJob.GetNamespace(), cronJob.GetName())) + logger.Error(err, "Unable to make Job from template", "cronjob", klog.KObj(cronJob)) return cronJob, nil, updateStatus, err } jobResp, err := jm.jobControl.CreateJob(cronJob.Namespace, jobReq) @@ -603,7 +607,7 @@ func (jm *ControllerV2) syncCronJob( case errors.HasStatusCause(err, corev1.NamespaceTerminatingCause): case errors.IsAlreadyExists(err): // If the job is created by other actor, assume it has updated the cronjob status accordingly - klog.InfoS("Job already exists", "cronjob", klog.KRef(cronJob.GetNamespace(), cronJob.GetName()), "job", klog.KRef(jobReq.GetNamespace(), jobReq.GetName())) + logger.Info("Job already exists", "cronjob", klog.KObj(cronJob), "job", klog.KObj(jobReq)) return cronJob, nil, updateStatus, err case err != nil: // default error handling @@ -612,7 +616,7 @@ func (jm *ControllerV2) syncCronJob( } metrics.CronJobCreationSkew.Observe(jobResp.ObjectMeta.GetCreationTimestamp().Sub(*scheduledTime).Seconds()) - klog.V(4).InfoS("Created Job", "job", klog.KRef(jobResp.GetNamespace(), jobResp.GetName()), "cronjob", klog.KRef(cronJob.GetNamespace(), cronJob.GetName())) + logger.V(4).Info("Created Job", "job", klog.KObj(jobResp), "cronjob", klog.KObj(cronJob)) jm.recorder.Eventf(cronJob, corev1.EventTypeNormal, "SuccessfulCreate", "Created job %v", jobResp.Name) // ------------------------------------------------------------------ // @@ -628,8 +632,8 @@ func (jm *ControllerV2) syncCronJob( // Add the just-started job to the status list. jobRef, err := getRef(jobResp) if err != nil { - klog.V(2).InfoS("Unable to make object reference", "cronjob", klog.KRef(cronJob.GetNamespace(), cronJob.GetName()), "err", err) - return cronJob, nil, updateStatus, fmt.Errorf("unable to make object reference for job for %s", klog.KRef(cronJob.GetNamespace(), cronJob.GetName())) + logger.V(2).Info("Unable to make object reference", "cronjob", klog.KObj(cronJob), "err", err) + return cronJob, nil, updateStatus, fmt.Errorf("unable to make object reference for job for %s", klog.KObj(cronJob)) } cronJob.Status.Active = append(cronJob.Status.Active, *jobRef) cronJob.Status.LastScheduleTime = &metav1.Time{Time: *scheduledTime} @@ -665,14 +669,14 @@ func (jm *ControllerV2) cleanupFinishedJobs(ctx context.Context, cj *batchv1.Cro } if cj.Spec.SuccessfulJobsHistoryLimit != nil && - jm.removeOldestJobs(cj, + jm.removeOldestJobs(ctx, cj, successfulJobs, *cj.Spec.SuccessfulJobsHistoryLimit) { updateStatus = true } if cj.Spec.FailedJobsHistoryLimit != nil && - jm.removeOldestJobs(cj, + jm.removeOldestJobs(ctx, cj, failedJobs, *cj.Spec.FailedJobsHistoryLimit) { updateStatus = true @@ -691,19 +695,19 @@ func (jm *ControllerV2) getFinishedStatus(j *batchv1.Job) (bool, batchv1.JobCond } // removeOldestJobs removes the oldest jobs from a list of jobs -func (jm *ControllerV2) removeOldestJobs(cj *batchv1.CronJob, js []*batchv1.Job, maxJobs int32) bool { +func (jm *ControllerV2) removeOldestJobs(ctx context.Context, cj *batchv1.CronJob, js []*batchv1.Job, maxJobs int32) bool { updateStatus := false numToDelete := len(js) - int(maxJobs) if numToDelete <= 0 { return updateStatus } - - klog.V(4).InfoS("Cleaning up jobs from CronJob list", "deletejobnum", numToDelete, "jobnum", len(js), "cronjob", klog.KRef(cj.GetNamespace(), cj.GetName())) + logger := klog.FromContext(ctx) + logger.V(4).Info("Cleaning up jobs from CronJob list", "deletejobnum", numToDelete, "jobnum", len(js), "cronjob", klog.KObj(cj)) sort.Sort(byJobStartTime(js)) for i := 0; i < numToDelete; i++ { - klog.V(4).InfoS("Removing job from CronJob list", "job", js[i].Name, "cronjob", klog.KRef(cj.GetNamespace(), cj.GetName())) - if deleteJob(cj, js[i], jm.jobControl, jm.recorder) { + logger.V(4).Info("Removing job from CronJob list", "job", js[i].Name, "cronjob", klog.KObj(cj)) + if deleteJob(logger, cj, js[i], jm.jobControl, jm.recorder) { updateStatus = true } } @@ -711,13 +715,11 @@ func (jm *ControllerV2) removeOldestJobs(cj *batchv1.CronJob, js []*batchv1.Job, } // deleteJob reaps a job, deleting the job, the pods and the reference in the active list -func deleteJob(cj *batchv1.CronJob, job *batchv1.Job, jc jobControlInterface, recorder record.EventRecorder) bool { - nameForLog := fmt.Sprintf("%s/%s", cj.Namespace, cj.Name) - +func deleteJob(logger klog.Logger, cj *batchv1.CronJob, job *batchv1.Job, jc jobControlInterface, recorder record.EventRecorder) bool { // delete the job itself... if err := jc.DeleteJob(job.Namespace, job.Name); err != nil { recorder.Eventf(cj, corev1.EventTypeWarning, "FailedDelete", "Deleted job: %v", err) - klog.Errorf("Error deleting job %s from %s: %v", job.Name, nameForLog, err) + logger.Error(err, "Error deleting job from cronjob", "job", klog.KObj(job), "cronjob", klog.KObj(cj)) return false } // ... and its reference from active list diff --git a/pkg/controller/cronjob/cronjob_controllerv2_test.go b/pkg/controller/cronjob/cronjob_controllerv2_test.go index 507f0b722c8..98fcf0ba934 100644 --- a/pkg/controller/cronjob/cronjob_controllerv2_test.go +++ b/pkg/controller/cronjob/cronjob_controllerv2_test.go @@ -36,6 +36,7 @@ import ( "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" featuregatetesting "k8s.io/component-base/featuregate/testing" + "k8s.io/klog/v2/ktesting" _ "k8s.io/kubernetes/pkg/apis/batch/install" _ "k8s.io/kubernetes/pkg/apis/core/install" "k8s.io/kubernetes/pkg/controller" @@ -1555,9 +1556,12 @@ func TestControllerV2UpdateCronJob(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { + logger, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + defer cancel() kubeClient := fake.NewSimpleClientset() sharedInformers := informers.NewSharedInformerFactory(kubeClient, controller.NoResyncPeriodFunc()) - jm, err := NewControllerV2(sharedInformers.Batch().V1().Jobs(), sharedInformers.Batch().V1().CronJobs(), kubeClient) + jm, err := NewControllerV2(ctx, sharedInformers.Batch().V1().Jobs(), sharedInformers.Batch().V1().CronJobs(), kubeClient) if err != nil { t.Errorf("unexpected error %v", err) return @@ -1569,7 +1573,7 @@ func TestControllerV2UpdateCronJob(t *testing.T) { jm.cronJobControl = &fakeCJControl{} jm.recorder = record.NewFakeRecorder(10) - jm.updateCronJob(tt.oldCronJob, tt.newCronJob) + jm.updateCronJob(logger, tt.oldCronJob, tt.newCronJob) if queue.delay.Seconds() != tt.expectedDelay.Seconds() { t.Errorf("Expected delay %#v got %#v", tt.expectedDelay.Seconds(), queue.delay.Seconds()) } @@ -1652,12 +1656,15 @@ func TestControllerV2GetJobsToBeReconciled(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + defer cancel() kubeClient := fake.NewSimpleClientset() sharedInformers := informers.NewSharedInformerFactory(kubeClient, controller.NoResyncPeriodFunc()) for _, job := range tt.jobs { sharedInformers.Batch().V1().Jobs().Informer().GetIndexer().Add(job) } - jm, err := NewControllerV2(sharedInformers.Batch().V1().Jobs(), sharedInformers.Batch().V1().CronJobs(), kubeClient) + jm, err := NewControllerV2(ctx, sharedInformers.Batch().V1().Jobs(), sharedInformers.Batch().V1().CronJobs(), kubeClient) if err != nil { t.Errorf("unexpected error %v", err) return diff --git a/pkg/controller/cronjob/utils.go b/pkg/controller/cronjob/utils.go index e0e2f951341..abe1dc85386 100644 --- a/pkg/controller/cronjob/utils.go +++ b/pkg/controller/cronjob/utils.go @@ -135,7 +135,7 @@ func nextScheduleTimeDuration(cj *batchv1.CronJob, now time.Time, schedule cron. // nextScheduleTime returns the time.Time of the next schedule after the last scheduled // and before now, or nil if no unmet schedule times, and an error. // If there are too many (>100) unstarted times, it will also record a warning. -func nextScheduleTime(cj *batchv1.CronJob, now time.Time, schedule cron.Schedule, recorder record.EventRecorder) (*time.Time, error) { +func nextScheduleTime(logger klog.Logger, cj *batchv1.CronJob, now time.Time, schedule cron.Schedule, recorder record.EventRecorder) (*time.Time, error) { _, mostRecentTime, numberOfMissedSchedules, err := mostRecentScheduleTime(cj, now, schedule, true) if mostRecentTime == nil || mostRecentTime.After(now) { @@ -161,7 +161,7 @@ func nextScheduleTime(cj *batchv1.CronJob, now time.Time, schedule cron.Schedule // I've somewhat arbitrarily picked 100, as more than 80, // but less than "lots". recorder.Eventf(cj, corev1.EventTypeWarning, "TooManyMissedTimes", "too many missed start times: %d. Set or decrease .spec.startingDeadlineSeconds or check clock skew", numberOfMissedSchedules) - klog.InfoS("too many missed times", "cronjob", klog.KRef(cj.GetNamespace(), cj.GetName()), "missedTimes", numberOfMissedSchedules) + logger.Info("too many missed times", "cronjob", klog.KObj(cj), "missedTimes", numberOfMissedSchedules) } return mostRecentTime, err } diff --git a/pkg/controller/cronjob/utils_test.go b/pkg/controller/cronjob/utils_test.go index e5f5ebcf3b6..2168ed5904b 100644 --- a/pkg/controller/cronjob/utils_test.go +++ b/pkg/controller/cronjob/utils_test.go @@ -29,6 +29,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/record" + "k8s.io/klog/v2/ktesting" ) func TestGetJobFromTemplate2(t *testing.T) { @@ -89,6 +90,7 @@ func TestGetJobFromTemplate2(t *testing.T) { } func TestNextScheduleTime(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) // schedule is hourly on the hour schedule := "0 * * * ?" @@ -124,7 +126,7 @@ func TestNextScheduleTime(t *testing.T) { cj.ObjectMeta.CreationTimestamp = metav1.Time{Time: T1.Add(-10 * time.Minute)} // Current time is more than creation time, but less than T1. now := T1.Add(-7 * time.Minute) - schedule, _ := nextScheduleTime(&cj, now, PraseSchedule(cj.Spec.Schedule), recorder) + schedule, _ := nextScheduleTime(logger, &cj, now, PraseSchedule(cj.Spec.Schedule), recorder) if schedule != nil { t.Errorf("expected no start time, got: %v", schedule) } @@ -135,7 +137,7 @@ func TestNextScheduleTime(t *testing.T) { cj.ObjectMeta.CreationTimestamp = metav1.Time{Time: T1.Add(-10 * time.Minute)} // Current time is after T1 now := T1.Add(2 * time.Second) - schedule, _ := nextScheduleTime(&cj, now, PraseSchedule(cj.Spec.Schedule), recorder) + schedule, _ := nextScheduleTime(logger, &cj, now, PraseSchedule(cj.Spec.Schedule), recorder) if schedule == nil { t.Errorf("expected 1 start time, got nil") } else if !schedule.Equal(T1) { @@ -150,7 +152,7 @@ func TestNextScheduleTime(t *testing.T) { cj.Status.LastScheduleTime = &metav1.Time{Time: T1} // Current time is after T1 now := T1.Add(2 * time.Minute) - schedule, _ := nextScheduleTime(&cj, now, PraseSchedule(cj.Spec.Schedule), recorder) + schedule, _ := nextScheduleTime(logger, &cj, now, PraseSchedule(cj.Spec.Schedule), recorder) if schedule != nil { t.Errorf("expected 0 start times, got: %v", schedule) } @@ -163,7 +165,7 @@ func TestNextScheduleTime(t *testing.T) { cj.Status.LastScheduleTime = &metav1.Time{Time: T1} // Current time is after T1 and after T2 now := T2.Add(5 * time.Minute) - schedule, _ := nextScheduleTime(&cj, now, PraseSchedule(cj.Spec.Schedule), recorder) + schedule, _ := nextScheduleTime(logger, &cj, now, PraseSchedule(cj.Spec.Schedule), recorder) if schedule == nil { t.Errorf("expected 1 start times, got nil") } else if !schedule.Equal(T2) { @@ -176,7 +178,7 @@ func TestNextScheduleTime(t *testing.T) { cj.Status.LastScheduleTime = &metav1.Time{Time: T1.Add(-1 * time.Hour)} // Current time is after T1 and after T2 now := T2.Add(5 * time.Minute) - schedule, _ := nextScheduleTime(&cj, now, PraseSchedule(cj.Spec.Schedule), recorder) + schedule, _ := nextScheduleTime(logger, &cj, now, PraseSchedule(cj.Spec.Schedule), recorder) if schedule == nil { t.Errorf("expected 1 start times, got nil") } else if !schedule.Equal(T2) { @@ -188,7 +190,7 @@ func TestNextScheduleTime(t *testing.T) { cj.ObjectMeta.CreationTimestamp = metav1.Time{Time: T1.Add(-2 * time.Hour)} cj.Status.LastScheduleTime = &metav1.Time{Time: T1.Add(-1 * time.Hour)} now := T2.Add(10 * 24 * time.Hour) - schedule, _ := nextScheduleTime(&cj, now, PraseSchedule(cj.Spec.Schedule), recorder) + schedule, _ := nextScheduleTime(logger, &cj, now, PraseSchedule(cj.Spec.Schedule), recorder) if schedule == nil { t.Errorf("expected more than 0 missed times") } @@ -201,7 +203,7 @@ func TestNextScheduleTime(t *testing.T) { // Deadline is short deadline := int64(2 * 60 * 60) cj.Spec.StartingDeadlineSeconds = &deadline - schedule, _ := nextScheduleTime(&cj, now, PraseSchedule(cj.Spec.Schedule), recorder) + schedule, _ := nextScheduleTime(logger, &cj, now, PraseSchedule(cj.Spec.Schedule), recorder) if schedule == nil { t.Errorf("expected more than 0 missed times") } @@ -212,7 +214,7 @@ func TestNextScheduleTime(t *testing.T) { cj.Status.LastScheduleTime = nil now := *deltaTimeAfterTopOfTheHour(1 * time.Hour) // rouge schedule - schedule, err := nextScheduleTime(&cj, now, PraseSchedule("59 23 31 2 *"), recorder) + schedule, err := nextScheduleTime(logger, &cj, now, PraseSchedule("59 23 31 2 *"), recorder) if schedule != nil { t.Errorf("expected no start time, got: %v", schedule) } diff --git a/test/integration/cronjob/cronjob_test.go b/test/integration/cronjob/cronjob_test.go index 855ea9c84ad..2c149473a84 100644 --- a/test/integration/cronjob/cronjob_test.go +++ b/test/integration/cronjob/cronjob_test.go @@ -30,13 +30,14 @@ import ( clientset "k8s.io/client-go/kubernetes" clientbatchv1 "k8s.io/client-go/kubernetes/typed/batch/v1" restclient "k8s.io/client-go/rest" + "k8s.io/klog/v2/ktesting" kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing" "k8s.io/kubernetes/pkg/controller/cronjob" "k8s.io/kubernetes/pkg/controller/job" "k8s.io/kubernetes/test/integration/framework" ) -func setup(t *testing.T) (kubeapiservertesting.TearDownFunc, *cronjob.ControllerV2, *job.Controller, informers.SharedInformerFactory, clientset.Interface) { +func setup(ctx context.Context, t *testing.T) (kubeapiservertesting.TearDownFunc, *cronjob.ControllerV2, *job.Controller, informers.SharedInformerFactory, clientset.Interface) { // Disable ServiceAccount admission plugin as we don't have serviceaccount controller running. server := kubeapiservertesting.StartTestServerOrDie(t, nil, []string{"--disable-admission-plugins=ServiceAccount"}, framework.SharedEtcd()) @@ -47,7 +48,7 @@ func setup(t *testing.T) (kubeapiservertesting.TearDownFunc, *cronjob.Controller } resyncPeriod := 12 * time.Hour informerSet := informers.NewSharedInformerFactory(clientset.NewForConfigOrDie(restclient.AddUserAgent(config, "cronjob-informers")), resyncPeriod) - cjc, err := cronjob.NewControllerV2(informerSet.Batch().V1().Jobs(), informerSet.Batch().V1().CronJobs(), clientSet) + cjc, err := cronjob.NewControllerV2(ctx, informerSet.Batch().V1().Jobs(), informerSet.Batch().V1().CronJobs(), clientSet) if err != nil { t.Fatalf("Error creating CronJob controller: %v", err) } @@ -144,7 +145,11 @@ func validateJobAndPod(t *testing.T, clientSet clientset.Interface, namespace st } func TestCronJobLaunchesPodAndCleansUp(t *testing.T) { - closeFn, cjc, jc, informerSet, clientSet := setup(t) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + closeFn, cjc, jc, informerSet, clientSet := setup(ctx, t) defer closeFn() cronJobName := "foo" @@ -155,9 +160,6 @@ func TestCronJobLaunchesPodAndCleansUp(t *testing.T) { cjClient := clientSet.BatchV1().CronJobs(ns.Name) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - informerSet.Start(ctx.Done()) go cjc.Run(ctx, 1) go jc.Run(ctx, 1)