mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-31 15:25:57 +00:00
cronjob controller: ensure already existing jobs are added to Active list of cronjobs
Signed-off-by: Andrew Sy Kim <andrewsy@google.com>
This commit is contained in:
parent
160fe010f3
commit
301aa69fec
@ -585,6 +585,7 @@ func (jm *ControllerV2) syncCronJob(
|
||||
}
|
||||
}
|
||||
|
||||
jobAlreadyExists := false
|
||||
jobReq, err := getJobFromTemplate2(cronJob, *scheduledTime)
|
||||
if err != nil {
|
||||
logger.Error(err, "Unable to make Job from template", "cronjob", klog.KObj(cronJob))
|
||||
@ -597,18 +598,41 @@ func (jm *ControllerV2) syncCronJob(
|
||||
// anything because any creation will fail
|
||||
return nil, updateStatus, err
|
||||
case errors.IsAlreadyExists(err):
|
||||
// If the job is created by other actor, assume it has updated the cronjob status accordingly
|
||||
logger.Info("Job already exists", "cronjob", klog.KObj(cronJob), "job", klog.KObj(jobReq))
|
||||
return nil, updateStatus, err
|
||||
// If the job is created by other actor, assume it has updated the cronjob status accordingly.
|
||||
// However, if the job was created by cronjob controller, this means we've previously created the job
|
||||
// but failed to update the active list in the status, in which case we should reattempt to add the job
|
||||
// into the active list and update the status.
|
||||
jobAlreadyExists = true
|
||||
job, err := jm.jobControl.GetJob(jobReq.GetNamespace(), jobReq.GetName())
|
||||
if err != nil {
|
||||
return nil, updateStatus, err
|
||||
}
|
||||
jobResp = job
|
||||
|
||||
// check that this job is owned by cronjob controller, otherwise do nothing and assume external controller
|
||||
// is updating the status.
|
||||
if !metav1.IsControlledBy(job, cronJob) {
|
||||
return nil, updateStatus, nil
|
||||
}
|
||||
|
||||
// Recheck if the job is missing from the active list before attempting to update the status again.
|
||||
found := inActiveList(cronJob, job.ObjectMeta.UID)
|
||||
if found {
|
||||
return nil, updateStatus, nil
|
||||
}
|
||||
case err != nil:
|
||||
// default error handling
|
||||
jm.recorder.Eventf(cronJob, corev1.EventTypeWarning, "FailedCreate", "Error creating job: %v", err)
|
||||
return nil, updateStatus, err
|
||||
}
|
||||
|
||||
metrics.CronJobCreationSkew.Observe(jobResp.ObjectMeta.GetCreationTimestamp().Sub(*scheduledTime).Seconds())
|
||||
logger.V(4).Info("Created Job", "job", klog.KObj(jobResp), "cronjob", klog.KObj(cronJob))
|
||||
jm.recorder.Eventf(cronJob, corev1.EventTypeNormal, "SuccessfulCreate", "Created job %v", jobResp.Name)
|
||||
if jobAlreadyExists {
|
||||
logger.Info("Job already exists", "cronjob", klog.KObj(cronJob), "job", klog.KObj(jobReq))
|
||||
} else {
|
||||
metrics.CronJobCreationSkew.Observe(jobResp.ObjectMeta.GetCreationTimestamp().Sub(*scheduledTime).Seconds())
|
||||
logger.V(4).Info("Created Job", "job", klog.KObj(jobResp), "cronjob", klog.KObj(cronJob))
|
||||
jm.recorder.Eventf(cronJob, corev1.EventTypeNormal, "SuccessfulCreate", "Created job %v", jobResp.Name)
|
||||
}
|
||||
|
||||
// ------------------------------------------------------------------ //
|
||||
|
||||
|
@ -466,10 +466,22 @@ func TestControllerV2SyncCronJob(t *testing.T) {
|
||||
jobCreationTime: justAfterThePriorHour(),
|
||||
now: *justAfterTheHour(),
|
||||
jobCreateError: errors.NewAlreadyExists(schema.GroupResource{Resource: "job", Group: "batch"}, ""),
|
||||
expectErr: true,
|
||||
expectErr: false,
|
||||
expectUpdateStatus: true,
|
||||
jobPresentInCJActiveStatus: true,
|
||||
},
|
||||
"prev ran but done, is time, job not present in CJ active status, create job failed, A": {
|
||||
concurrencyPolicy: "Allow",
|
||||
schedule: onTheHour,
|
||||
deadline: noDead,
|
||||
ranPreviously: true,
|
||||
jobCreationTime: justAfterThePriorHour(),
|
||||
now: *justAfterTheHour(),
|
||||
jobCreateError: errors.NewAlreadyExists(schema.GroupResource{Resource: "job", Group: "batch"}, ""),
|
||||
expectErr: false,
|
||||
expectUpdateStatus: true,
|
||||
jobPresentInCJActiveStatus: false,
|
||||
},
|
||||
"prev ran but done, is time, F": {
|
||||
concurrencyPolicy: "Forbid",
|
||||
schedule: onTheHour,
|
||||
@ -1812,3 +1824,103 @@ func TestControllerV2CleanupFinishedJobs(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// TestControllerV2JobAlreadyExistsButNotInActiveStatus validates that an already created job that was not added to the status
|
||||
// of a CronJob initially will be added back on the next sync. Previously, if we failed to update the status after creating a job,
|
||||
// cronjob controller would retry continuously because it would attempt to create a job that already exists.
|
||||
func TestControllerV2JobAlreadyExistsButNotInActiveStatus(t *testing.T) {
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
|
||||
cj := cronJob()
|
||||
cj.Spec.ConcurrencyPolicy = "Forbid"
|
||||
cj.Spec.Schedule = everyHour
|
||||
cj.Status.LastScheduleTime = &metav1.Time{Time: justBeforeThePriorHour()}
|
||||
cj.Status.Active = []v1.ObjectReference{}
|
||||
cjCopy := cj.DeepCopy()
|
||||
|
||||
job, err := getJobFromTemplate2(&cj, justAfterThePriorHour())
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error creating a job from template: %v", err)
|
||||
}
|
||||
job.UID = "1234"
|
||||
job.Namespace = cj.Namespace
|
||||
|
||||
client := fake.NewSimpleClientset(cjCopy, job)
|
||||
informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
|
||||
_ = informerFactory.Batch().V1().CronJobs().Informer().GetIndexer().Add(cjCopy)
|
||||
|
||||
jm, err := NewControllerV2(ctx, informerFactory.Batch().V1().Jobs(), informerFactory.Batch().V1().CronJobs(), client)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error %v", err)
|
||||
}
|
||||
|
||||
jobControl := &fakeJobControl{Job: job, CreateErr: errors.NewAlreadyExists(schema.GroupResource{Resource: "job", Group: "batch"}, "")}
|
||||
jm.jobControl = jobControl
|
||||
cronJobControl := &fakeCJControl{}
|
||||
jm.cronJobControl = cronJobControl
|
||||
jm.now = justBeforeTheHour
|
||||
|
||||
jm.enqueueController(cjCopy)
|
||||
jm.processNextWorkItem(ctx)
|
||||
|
||||
if len(cronJobControl.Updates) != 1 {
|
||||
t.Fatalf("Unexpected updates to cronjob, got: %d, expected 1", len(cronJobControl.Updates))
|
||||
}
|
||||
if len(cronJobControl.Updates[0].Status.Active) != 1 {
|
||||
t.Errorf("Unexpected active jobs count, got: %d, expected 1", len(cronJobControl.Updates[0].Status.Active))
|
||||
}
|
||||
|
||||
expectedActiveRef, err := getRef(job)
|
||||
if err != nil {
|
||||
t.Fatalf("Error getting expected job ref: %v", err)
|
||||
}
|
||||
if !reflect.DeepEqual(cronJobControl.Updates[0].Status.Active[0], *expectedActiveRef) {
|
||||
t.Errorf("Unexpected job reference in cronjob active list, got: %v, expected: %v", cronJobControl.Updates[0].Status.Active[0], expectedActiveRef)
|
||||
}
|
||||
}
|
||||
|
||||
// TestControllerV2JobAlreadyExistsButDifferentOwnner validates that an already created job
|
||||
// not owned by the cronjob controller is ignored.
|
||||
func TestControllerV2JobAlreadyExistsButDifferentOwner(t *testing.T) {
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
|
||||
cj := cronJob()
|
||||
cj.Spec.ConcurrencyPolicy = "Forbid"
|
||||
cj.Spec.Schedule = everyHour
|
||||
cj.Status.LastScheduleTime = &metav1.Time{Time: justBeforeThePriorHour()}
|
||||
cj.Status.Active = []v1.ObjectReference{}
|
||||
cjCopy := cj.DeepCopy()
|
||||
|
||||
job, err := getJobFromTemplate2(&cj, justAfterThePriorHour())
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error creating a job from template: %v", err)
|
||||
}
|
||||
job.UID = "1234"
|
||||
job.Namespace = cj.Namespace
|
||||
|
||||
// remove owners for this test since we are testing that jobs not belonging to cronjob
|
||||
// controller are safely ignored
|
||||
job.OwnerReferences = []metav1.OwnerReference{}
|
||||
|
||||
client := fake.NewSimpleClientset(cjCopy, job)
|
||||
informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
|
||||
_ = informerFactory.Batch().V1().CronJobs().Informer().GetIndexer().Add(cjCopy)
|
||||
|
||||
jm, err := NewControllerV2(ctx, informerFactory.Batch().V1().Jobs(), informerFactory.Batch().V1().CronJobs(), client)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error %v", err)
|
||||
}
|
||||
|
||||
jobControl := &fakeJobControl{Job: job, CreateErr: errors.NewAlreadyExists(schema.GroupResource{Resource: "job", Group: "batch"}, "")}
|
||||
jm.jobControl = jobControl
|
||||
cronJobControl := &fakeCJControl{}
|
||||
jm.cronJobControl = cronJobControl
|
||||
jm.now = justBeforeTheHour
|
||||
|
||||
jm.enqueueController(cjCopy)
|
||||
jm.processNextWorkItem(ctx)
|
||||
|
||||
if len(cronJobControl.Updates) != 0 {
|
||||
t.Fatalf("Unexpected updates to cronjob, got: %d, expected 0", len(cronJobControl.Updates))
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user