From 301aa69fec5e1d5dbdb19c3edc50d61fd7fc728d Mon Sep 17 00:00:00 2001 From: Andrew Sy Kim Date: Wed, 13 Sep 2023 00:54:25 +0000 Subject: [PATCH] cronjob controller: ensure already existing jobs are added to Active list of cronjobs Signed-off-by: Andrew Sy Kim --- .../cronjob/cronjob_controllerv2.go | 36 +++++- .../cronjob/cronjob_controllerv2_test.go | 114 +++++++++++++++++- 2 files changed, 143 insertions(+), 7 deletions(-) diff --git a/pkg/controller/cronjob/cronjob_controllerv2.go b/pkg/controller/cronjob/cronjob_controllerv2.go index ac4d63eb8a0..bb2da6bf080 100644 --- a/pkg/controller/cronjob/cronjob_controllerv2.go +++ b/pkg/controller/cronjob/cronjob_controllerv2.go @@ -585,6 +585,7 @@ func (jm *ControllerV2) syncCronJob( } } + jobAlreadyExists := false jobReq, err := getJobFromTemplate2(cronJob, *scheduledTime) if err != nil { logger.Error(err, "Unable to make Job from template", "cronjob", klog.KObj(cronJob)) @@ -597,18 +598,41 @@ func (jm *ControllerV2) syncCronJob( // anything because any creation will fail return nil, updateStatus, err case errors.IsAlreadyExists(err): - // If the job is created by other actor, assume it has updated the cronjob status accordingly - logger.Info("Job already exists", "cronjob", klog.KObj(cronJob), "job", klog.KObj(jobReq)) - return nil, updateStatus, err + // If the job is created by other actor, assume it has updated the cronjob status accordingly. + // However, if the job was created by cronjob controller, this means we've previously created the job + // but failed to update the active list in the status, in which case we should reattempt to add the job + // into the active list and update the status. + jobAlreadyExists = true + job, err := jm.jobControl.GetJob(jobReq.GetNamespace(), jobReq.GetName()) + if err != nil { + return nil, updateStatus, err + } + jobResp = job + + // check that this job is owned by cronjob controller, otherwise do nothing and assume external controller + // is updating the status. + if !metav1.IsControlledBy(job, cronJob) { + return nil, updateStatus, nil + } + + // Recheck if the job is missing from the active list before attempting to update the status again. + found := inActiveList(cronJob, job.ObjectMeta.UID) + if found { + return nil, updateStatus, nil + } case err != nil: // default error handling jm.recorder.Eventf(cronJob, corev1.EventTypeWarning, "FailedCreate", "Error creating job: %v", err) return nil, updateStatus, err } - metrics.CronJobCreationSkew.Observe(jobResp.ObjectMeta.GetCreationTimestamp().Sub(*scheduledTime).Seconds()) - logger.V(4).Info("Created Job", "job", klog.KObj(jobResp), "cronjob", klog.KObj(cronJob)) - jm.recorder.Eventf(cronJob, corev1.EventTypeNormal, "SuccessfulCreate", "Created job %v", jobResp.Name) + if jobAlreadyExists { + logger.Info("Job already exists", "cronjob", klog.KObj(cronJob), "job", klog.KObj(jobReq)) + } else { + metrics.CronJobCreationSkew.Observe(jobResp.ObjectMeta.GetCreationTimestamp().Sub(*scheduledTime).Seconds()) + logger.V(4).Info("Created Job", "job", klog.KObj(jobResp), "cronjob", klog.KObj(cronJob)) + jm.recorder.Eventf(cronJob, corev1.EventTypeNormal, "SuccessfulCreate", "Created job %v", jobResp.Name) + } // ------------------------------------------------------------------ // diff --git a/pkg/controller/cronjob/cronjob_controllerv2_test.go b/pkg/controller/cronjob/cronjob_controllerv2_test.go index 876adcc75b3..ed1436713ad 100644 --- a/pkg/controller/cronjob/cronjob_controllerv2_test.go +++ b/pkg/controller/cronjob/cronjob_controllerv2_test.go @@ -466,10 +466,22 @@ func TestControllerV2SyncCronJob(t *testing.T) { jobCreationTime: justAfterThePriorHour(), now: *justAfterTheHour(), jobCreateError: errors.NewAlreadyExists(schema.GroupResource{Resource: "job", Group: "batch"}, ""), - expectErr: true, + expectErr: false, expectUpdateStatus: true, jobPresentInCJActiveStatus: true, }, + "prev ran but done, is time, job not present in CJ active status, create job failed, A": { + concurrencyPolicy: "Allow", + schedule: onTheHour, + deadline: noDead, + ranPreviously: true, + jobCreationTime: justAfterThePriorHour(), + now: *justAfterTheHour(), + jobCreateError: errors.NewAlreadyExists(schema.GroupResource{Resource: "job", Group: "batch"}, ""), + expectErr: false, + expectUpdateStatus: true, + jobPresentInCJActiveStatus: false, + }, "prev ran but done, is time, F": { concurrencyPolicy: "Forbid", schedule: onTheHour, @@ -1812,3 +1824,103 @@ func TestControllerV2CleanupFinishedJobs(t *testing.T) { }) } } + +// TestControllerV2JobAlreadyExistsButNotInActiveStatus validates that an already created job that was not added to the status +// of a CronJob initially will be added back on the next sync. Previously, if we failed to update the status after creating a job, +// cronjob controller would retry continuously because it would attempt to create a job that already exists. +func TestControllerV2JobAlreadyExistsButNotInActiveStatus(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) + + cj := cronJob() + cj.Spec.ConcurrencyPolicy = "Forbid" + cj.Spec.Schedule = everyHour + cj.Status.LastScheduleTime = &metav1.Time{Time: justBeforeThePriorHour()} + cj.Status.Active = []v1.ObjectReference{} + cjCopy := cj.DeepCopy() + + job, err := getJobFromTemplate2(&cj, justAfterThePriorHour()) + if err != nil { + t.Fatalf("Unexpected error creating a job from template: %v", err) + } + job.UID = "1234" + job.Namespace = cj.Namespace + + client := fake.NewSimpleClientset(cjCopy, job) + informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc()) + _ = informerFactory.Batch().V1().CronJobs().Informer().GetIndexer().Add(cjCopy) + + jm, err := NewControllerV2(ctx, informerFactory.Batch().V1().Jobs(), informerFactory.Batch().V1().CronJobs(), client) + if err != nil { + t.Fatalf("unexpected error %v", err) + } + + jobControl := &fakeJobControl{Job: job, CreateErr: errors.NewAlreadyExists(schema.GroupResource{Resource: "job", Group: "batch"}, "")} + jm.jobControl = jobControl + cronJobControl := &fakeCJControl{} + jm.cronJobControl = cronJobControl + jm.now = justBeforeTheHour + + jm.enqueueController(cjCopy) + jm.processNextWorkItem(ctx) + + if len(cronJobControl.Updates) != 1 { + t.Fatalf("Unexpected updates to cronjob, got: %d, expected 1", len(cronJobControl.Updates)) + } + if len(cronJobControl.Updates[0].Status.Active) != 1 { + t.Errorf("Unexpected active jobs count, got: %d, expected 1", len(cronJobControl.Updates[0].Status.Active)) + } + + expectedActiveRef, err := getRef(job) + if err != nil { + t.Fatalf("Error getting expected job ref: %v", err) + } + if !reflect.DeepEqual(cronJobControl.Updates[0].Status.Active[0], *expectedActiveRef) { + t.Errorf("Unexpected job reference in cronjob active list, got: %v, expected: %v", cronJobControl.Updates[0].Status.Active[0], expectedActiveRef) + } +} + +// TestControllerV2JobAlreadyExistsButDifferentOwnner validates that an already created job +// not owned by the cronjob controller is ignored. +func TestControllerV2JobAlreadyExistsButDifferentOwner(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) + + cj := cronJob() + cj.Spec.ConcurrencyPolicy = "Forbid" + cj.Spec.Schedule = everyHour + cj.Status.LastScheduleTime = &metav1.Time{Time: justBeforeThePriorHour()} + cj.Status.Active = []v1.ObjectReference{} + cjCopy := cj.DeepCopy() + + job, err := getJobFromTemplate2(&cj, justAfterThePriorHour()) + if err != nil { + t.Fatalf("Unexpected error creating a job from template: %v", err) + } + job.UID = "1234" + job.Namespace = cj.Namespace + + // remove owners for this test since we are testing that jobs not belonging to cronjob + // controller are safely ignored + job.OwnerReferences = []metav1.OwnerReference{} + + client := fake.NewSimpleClientset(cjCopy, job) + informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc()) + _ = informerFactory.Batch().V1().CronJobs().Informer().GetIndexer().Add(cjCopy) + + jm, err := NewControllerV2(ctx, informerFactory.Batch().V1().Jobs(), informerFactory.Batch().V1().CronJobs(), client) + if err != nil { + t.Fatalf("unexpected error %v", err) + } + + jobControl := &fakeJobControl{Job: job, CreateErr: errors.NewAlreadyExists(schema.GroupResource{Resource: "job", Group: "batch"}, "")} + jm.jobControl = jobControl + cronJobControl := &fakeCJControl{} + jm.cronJobControl = cronJobControl + jm.now = justBeforeTheHour + + jm.enqueueController(cjCopy) + jm.processNextWorkItem(ctx) + + if len(cronJobControl.Updates) != 0 { + t.Fatalf("Unexpected updates to cronjob, got: %d, expected 0", len(cronJobControl.Updates)) + } +}