Fix a job quota related deadlock (#119776)

* Fix a job quota related deadlock

In case ResourceQuota is used and sets a max # of jobs, a CronJob may get
trapped in a deadlock:
  1. Job quota for a namespace is reached.
  2. CronJob controller can't create a new job, because quota is
     reached.
  3. Cleanup of jobs owned by a cronjob doesn't happen, because a
     control loop iteration is finished because of an error to create a
     job.

To fix this we stop early quitting from a control loop iteration when
cronjob reconciliation failed and always let old jobs to be cleaned up.

* Dont reorder imports

* Don't stop requeuing on reconciliation error

Previous code only logged the reconciliation error inside jm.sync() and
didn't return the reconciliation error to it's invoker
processNextWorkItem().

Adding a copy-paste back to avoid this issue.

* Remove copy-pasted cleanupFinishedJobs()

Now we always call jm.cleanupFinishedJobs() first and then
jm.syncCronJob().

We also extract cronJobCopy and updateStatus outside jm.syncCronJob
function and pass pointers to them in both jm.syncCronJob and
jm.cleanupFinishedJobs to make delayed updates handling more explicit
and not dependent on the order in which cleanupFinishedJobs and
syncCronJob are invoked.

* Return updateStatus bool instead of changing the reference

* Explicitly ignore err in tests to fix linter
This commit is contained in:
Albert Sverdlov 2023-08-31 17:25:00 +02:00 committed by GitHub
parent 8777747c98
commit a46bab6930
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 152 additions and 39 deletions

View File

@ -196,24 +196,19 @@ func (jm *ControllerV2) sync(ctx context.Context, cronJobKey string) (*time.Dura
return nil, err
}
cronJobCopy, requeueAfter, updateStatus, err := jm.syncCronJob(ctx, cronJob, jobsToBeReconciled)
// cronJobCopy is used to combine all the updates to a
// CronJob object and perform an actual update only once.
cronJobCopy := cronJob.DeepCopy()
updateStatusAfterCleanup := jm.cleanupFinishedJobs(ctx, cronJobCopy, jobsToBeReconciled)
requeueAfter, updateStatusAfterSync, syncErr := jm.syncCronJob(ctx, cronJobCopy, jobsToBeReconciled)
if err != nil {
logger.V(2).Info("Error reconciling cronjob", "cronjob", klog.KObj(cronJob), "err", err)
if updateStatus {
if _, err := jm.cronJobControl.UpdateStatus(ctx, cronJobCopy); err != nil {
logger.V(2).Info("Unable to update status for cronjob", "cronjob", klog.KObj(cronJob), "resourceVersion", cronJob.ResourceVersion, "err", err)
return nil, err
}
}
return nil, err
}
if jm.cleanupFinishedJobs(ctx, cronJobCopy, jobsToBeReconciled) {
updateStatus = true
}
// Update the CronJob if needed
if updateStatus {
if updateStatusAfterCleanup || updateStatusAfterSync {
if _, err := jm.cronJobControl.UpdateStatus(ctx, cronJobCopy); err != nil {
logger.V(2).Info("Unable to update status for cronjob", "cronjob", klog.KObj(cronJob), "resourceVersion", cronJob.ResourceVersion, "err", err)
return nil, err
@ -225,7 +220,7 @@ func (jm *ControllerV2) sync(ctx context.Context, cronJobKey string) (*time.Dura
return requeueAfter, nil
}
// this marks the key done, currently only happens when the cronjob is suspended or spec has invalid schedule format
return nil, nil
return nil, syncErr
}
// resolveControllerRef returns the controller referenced by a ControllerRef,
@ -416,15 +411,12 @@ func (jm *ControllerV2) updateCronJob(logger klog.Logger, old interface{}, curr
// syncCronJob reconciles a CronJob with a list of any Jobs that it created.
// All known jobs created by "cronJob" should be included in "jobs".
// The current time is passed in to facilitate testing.
// It returns a copy of the CronJob that is to be used by other functions
// that mutates the object
// It also returns a bool to indicate an update to api-server is needed
// It returns a bool to indicate an update to api-server is needed
func (jm *ControllerV2) syncCronJob(
ctx context.Context,
cronJob *batchv1.CronJob,
jobs []*batchv1.Job) (*batchv1.CronJob, *time.Duration, bool, error) {
jobs []*batchv1.Job) (*time.Duration, bool, error) {
cronJob = cronJob.DeepCopy()
now := jm.now()
updateStatus := false
@ -435,7 +427,7 @@ func (jm *ControllerV2) syncCronJob(
if !found && !IsJobFinished(j) {
cjCopy, err := jm.cronJobControl.GetCronJob(ctx, cronJob.Namespace, cronJob.Name)
if err != nil {
return nil, nil, updateStatus, err
return nil, updateStatus, err
}
if inActiveList(cjCopy, j.ObjectMeta.UID) {
cronJob = cjCopy
@ -483,7 +475,7 @@ func (jm *ControllerV2) syncCronJob(
deleteFromActiveList(cronJob, j.UID)
updateStatus = true
case err != nil:
return cronJob, nil, updateStatus, err
return nil, updateStatus, err
}
// the job is missing in the lister but found in api-server
}
@ -491,7 +483,7 @@ func (jm *ControllerV2) syncCronJob(
if cronJob.DeletionTimestamp != nil {
// The CronJob is being deleted.
// Don't do anything other than updating status.
return cronJob, nil, updateStatus, nil
return nil, updateStatus, nil
}
logger := klog.FromContext(ctx)
@ -500,13 +492,13 @@ func (jm *ControllerV2) syncCronJob(
if _, err := time.LoadLocation(timeZone); err != nil {
logger.V(4).Info("Not starting job because timeZone is invalid", "cronjob", klog.KObj(cronJob), "timeZone", timeZone, "err", err)
jm.recorder.Eventf(cronJob, corev1.EventTypeWarning, "UnknownTimeZone", "invalid timeZone: %q: %s", timeZone, err)
return cronJob, nil, updateStatus, nil
return nil, updateStatus, nil
}
}
if cronJob.Spec.Suspend != nil && *cronJob.Spec.Suspend {
logger.V(4).Info("Not starting job because the cron is suspended", "cronjob", klog.KObj(cronJob))
return cronJob, nil, updateStatus, nil
return nil, updateStatus, nil
}
sched, err := cron.ParseStandard(formatSchedule(cronJob, jm.recorder))
@ -515,7 +507,7 @@ func (jm *ControllerV2) syncCronJob(
// we should log the error and not reconcile this cronjob until an update to spec
logger.V(2).Info("Unparseable schedule", "cronjob", klog.KObj(cronJob), "schedule", cronJob.Spec.Schedule, "err", err)
jm.recorder.Eventf(cronJob, corev1.EventTypeWarning, "UnparseableSchedule", "unparseable schedule: %q : %s", cronJob.Spec.Schedule, err)
return cronJob, nil, updateStatus, nil
return nil, updateStatus, nil
}
scheduledTime, err := nextScheduleTime(logger, cronJob, now, sched, jm.recorder)
@ -524,7 +516,7 @@ func (jm *ControllerV2) syncCronJob(
// we should log the error and not reconcile this cronjob until an update to spec
logger.V(2).Info("Invalid schedule", "cronjob", klog.KObj(cronJob), "schedule", cronJob.Spec.Schedule, "err", err)
jm.recorder.Eventf(cronJob, corev1.EventTypeWarning, "InvalidSchedule", "invalid schedule: %s : %s", cronJob.Spec.Schedule, err)
return cronJob, nil, updateStatus, nil
return nil, updateStatus, nil
}
if scheduledTime == nil {
// no unmet start time, return cj,.
@ -533,7 +525,7 @@ func (jm *ControllerV2) syncCronJob(
// the scheduled time, that will give atleast 1 unmet time schedule
logger.V(4).Info("No unmet start times", "cronjob", klog.KObj(cronJob))
t := nextScheduleTimeDuration(cronJob, now, sched)
return cronJob, t, updateStatus, nil
return t, updateStatus, nil
}
tooLate := false
@ -552,7 +544,7 @@ func (jm *ControllerV2) syncCronJob(
// and event the next time we process it, and also so the user looking at the status
// can see easily that there was a missed execution.
t := nextScheduleTimeDuration(cronJob, now, sched)
return cronJob, t, updateStatus, nil
return t, updateStatus, nil
}
if inActiveListByName(cronJob, &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
@ -561,7 +553,7 @@ func (jm *ControllerV2) syncCronJob(
}}) || cronJob.Status.LastScheduleTime.Equal(&metav1.Time{Time: *scheduledTime}) {
logger.V(4).Info("Not starting job because the scheduled time is already processed", "cronjob", klog.KObj(cronJob), "schedule", scheduledTime)
t := nextScheduleTimeDuration(cronJob, now, sched)
return cronJob, t, updateStatus, nil
return t, updateStatus, nil
}
if cronJob.Spec.ConcurrencyPolicy == batchv1.ForbidConcurrent && len(cronJob.Status.Active) > 0 {
// Regardless which source of information we use for the set of active jobs,
@ -576,7 +568,7 @@ func (jm *ControllerV2) syncCronJob(
logger.V(4).Info("Not starting job because prior execution is still running and concurrency policy is Forbid", "cronjob", klog.KObj(cronJob))
jm.recorder.Eventf(cronJob, corev1.EventTypeNormal, "JobAlreadyActive", "Not starting job because prior execution is running and concurrency policy is Forbid")
t := nextScheduleTimeDuration(cronJob, now, sched)
return cronJob, t, updateStatus, nil
return t, updateStatus, nil
}
if cronJob.Spec.ConcurrencyPolicy == batchv1.ReplaceConcurrent {
for _, j := range cronJob.Status.Active {
@ -584,10 +576,10 @@ func (jm *ControllerV2) syncCronJob(
job, err := jm.jobControl.GetJob(j.Namespace, j.Name)
if err != nil {
jm.recorder.Eventf(cronJob, corev1.EventTypeWarning, "FailedGet", "Get job: %v", err)
return cronJob, nil, updateStatus, err
return nil, updateStatus, err
}
if !deleteJob(logger, cronJob, job, jm.jobControl, jm.recorder) {
return cronJob, nil, updateStatus, fmt.Errorf("could not replace job %s/%s", job.Namespace, job.Name)
return nil, updateStatus, fmt.Errorf("could not replace job %s/%s", job.Namespace, job.Name)
}
updateStatus = true
}
@ -596,22 +588,22 @@ func (jm *ControllerV2) syncCronJob(
jobReq, err := getJobFromTemplate2(cronJob, *scheduledTime)
if err != nil {
logger.Error(err, "Unable to make Job from template", "cronjob", klog.KObj(cronJob))
return cronJob, nil, updateStatus, err
return nil, updateStatus, err
}
jobResp, err := jm.jobControl.CreateJob(cronJob.Namespace, jobReq)
switch {
case errors.HasStatusCause(err, corev1.NamespaceTerminatingCause):
// if the namespace is being terminated, we don't have to do
// anything because any creation will fail
return cronJob, nil, updateStatus, err
return nil, updateStatus, err
case errors.IsAlreadyExists(err):
// If the job is created by other actor, assume it has updated the cronjob status accordingly
logger.Info("Job already exists", "cronjob", klog.KObj(cronJob), "job", klog.KObj(jobReq))
return cronJob, nil, updateStatus, err
return nil, updateStatus, err
case err != nil:
// default error handling
jm.recorder.Eventf(cronJob, corev1.EventTypeWarning, "FailedCreate", "Error creating job: %v", err)
return cronJob, nil, updateStatus, err
return nil, updateStatus, err
}
metrics.CronJobCreationSkew.Observe(jobResp.ObjectMeta.GetCreationTimestamp().Sub(*scheduledTime).Seconds())
@ -632,14 +624,14 @@ func (jm *ControllerV2) syncCronJob(
jobRef, err := getRef(jobResp)
if err != nil {
logger.V(2).Info("Unable to make object reference", "cronjob", klog.KObj(cronJob), "err", err)
return cronJob, nil, updateStatus, fmt.Errorf("unable to make object reference for job for %s", klog.KObj(cronJob))
return nil, updateStatus, fmt.Errorf("unable to make object reference for job for %s", klog.KObj(cronJob))
}
cronJob.Status.Active = append(cronJob.Status.Active, *jobRef)
cronJob.Status.LastScheduleTime = &metav1.Time{Time: *scheduledTime}
updateStatus = true
t := nextScheduleTimeDuration(cronJob, now, sched)
return cronJob, t, updateStatus, nil
return t, updateStatus, nil
}
func getJobName(cj *batchv1.CronJob, scheduledTime time.Time) string {

View File

@ -20,6 +20,7 @@ import (
"context"
"fmt"
"reflect"
"sort"
"strings"
"testing"
"time"
@ -36,6 +37,8 @@ import (
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2/ktesting"
"k8s.io/utils/pointer"
_ "k8s.io/kubernetes/pkg/apis/batch/install"
_ "k8s.io/kubernetes/pkg/apis/core/install"
"k8s.io/kubernetes/pkg/controller"
@ -1258,7 +1261,8 @@ func TestControllerV2SyncCronJob(t *testing.T) {
return tc.now
},
}
cjCopy, requeueAfter, updateStatus, err := jm.syncCronJob(context.TODO(), &cj, js)
cjCopy := cj.DeepCopy()
requeueAfter, updateStatus, err := jm.syncCronJob(context.TODO(), cjCopy, js)
if tc.expectErr && err == nil {
t.Errorf("%s: expected error got none with requeueAfter time: %#v", name, requeueAfter)
}
@ -1691,3 +1695,120 @@ func TestControllerV2GetJobsToBeReconciled(t *testing.T) {
})
}
}
func TestControllerV2CleanupFinishedJobs(t *testing.T) {
tests := []struct {
name string
now time.Time
cronJob *batchv1.CronJob
finishedJobs []*batchv1.Job
jobCreateError error
expectedDeletedJobs []string
}{
{
name: "jobs are still deleted when a cronjob can't create jobs due to jobs quota being reached (avoiding a deadlock)",
now: *justAfterTheHour(),
cronJob: &batchv1.CronJob{
ObjectMeta: metav1.ObjectMeta{Namespace: "foo-ns", Name: "fooer"},
Spec: batchv1.CronJobSpec{
Schedule: onTheHour,
SuccessfulJobsHistoryLimit: pointer.Int32(1),
JobTemplate: batchv1.JobTemplateSpec{
ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{"key": "value"}},
},
},
Status: batchv1.CronJobStatus{LastScheduleTime: &metav1.Time{Time: justAfterThePriorHour()}},
},
finishedJobs: []*batchv1.Job{
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "foo-ns",
Name: "finished-job-started-hour-ago",
OwnerReferences: []metav1.OwnerReference{{Name: "fooer", Controller: pointer.Bool(true)}},
},
Status: batchv1.JobStatus{StartTime: &metav1.Time{Time: justBeforeThePriorHour()}},
},
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "foo-ns",
Name: "finished-job-started-minute-ago",
OwnerReferences: []metav1.OwnerReference{{Name: "fooer", Controller: pointer.Bool(true)}},
},
Status: batchv1.JobStatus{StartTime: &metav1.Time{Time: justBeforeTheHour()}},
},
},
jobCreateError: errors.NewInternalError(fmt.Errorf("quota for # of jobs reached")),
expectedDeletedJobs: []string{"finished-job-started-hour-ago"},
},
{
name: "jobs are not deleted if history limit not reached",
now: justBeforeTheHour(),
cronJob: &batchv1.CronJob{
ObjectMeta: metav1.ObjectMeta{Namespace: "foo-ns", Name: "fooer"},
Spec: batchv1.CronJobSpec{
Schedule: onTheHour,
SuccessfulJobsHistoryLimit: pointer.Int32(2),
JobTemplate: batchv1.JobTemplateSpec{
ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{"key": "value"}},
},
},
Status: batchv1.CronJobStatus{LastScheduleTime: &metav1.Time{Time: justAfterThePriorHour()}},
},
finishedJobs: []*batchv1.Job{
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "foo-ns",
Name: "finished-job-started-hour-ago",
OwnerReferences: []metav1.OwnerReference{{Name: "fooer", Controller: pointer.Bool(true)}},
},
Status: batchv1.JobStatus{StartTime: &metav1.Time{Time: justBeforeThePriorHour()}},
},
},
jobCreateError: nil,
expectedDeletedJobs: []string{},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
for _, job := range tt.finishedJobs {
job.Status.Conditions = []batchv1.JobCondition{{Type: batchv1.JobComplete, Status: v1.ConditionTrue}}
}
client := fake.NewSimpleClientset()
informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
_ = informerFactory.Batch().V1().CronJobs().Informer().GetIndexer().Add(tt.cronJob)
for _, job := range tt.finishedJobs {
_ = informerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job)
}
jm, err := NewControllerV2(ctx, informerFactory.Batch().V1().Jobs(), informerFactory.Batch().V1().CronJobs(), client)
if err != nil {
t.Errorf("unexpected error %v", err)
return
}
jobControl := &fakeJobControl{CreateErr: tt.jobCreateError}
jm.jobControl = jobControl
jm.now = func() time.Time {
return tt.now
}
jm.enqueueController(tt.cronJob)
jm.processNextWorkItem(ctx)
if len(tt.expectedDeletedJobs) != len(jobControl.DeleteJobName) {
t.Fatalf("expected '%v' jobs to be deleted, instead deleted '%s'", tt.expectedDeletedJobs, jobControl.DeleteJobName)
}
sort.Strings(jobControl.DeleteJobName)
sort.Strings(tt.expectedDeletedJobs)
for i, deletedJob := range jobControl.DeleteJobName {
if deletedJob != tt.expectedDeletedJobs[i] {
t.Fatalf("expected '%v' jobs to be deleted, instead deleted '%s'", tt.expectedDeletedJobs, jobControl.DeleteJobName)
}
}
})
}
}