diff --git a/pkg/controller/cronjob/BUILD b/pkg/controller/cronjob/BUILD index 26cd161379e..94dfb52d41d 100644 --- a/pkg/controller/cronjob/BUILD +++ b/pkg/controller/cronjob/BUILD @@ -24,6 +24,7 @@ go_library( "//pkg/apis/batch/v1:go_default_library", "//pkg/apis/batch/v2alpha1:go_default_library", "//pkg/client/clientset_generated/clientset:go_default_library", + "//pkg/controller:go_default_library", "//pkg/util/metrics:go_default_library", "//vendor/github.com/golang/glog:go_default_library", "//vendor/github.com/robfig/cron:go_default_library", diff --git a/pkg/controller/cronjob/cronjob_controller.go b/pkg/controller/cronjob/cronjob_controller.go index cc664b5bca7..d78689a82c4 100644 --- a/pkg/controller/cronjob/cronjob_controller.go +++ b/pkg/controller/cronjob/cronjob_controller.go @@ -247,6 +247,12 @@ func syncOne(sj *batchv2alpha1.CronJob, js []batchv1.Job, now time.Time, jc jobC return } + if err := adoptJobs(sj, js, jc); err != nil { + // This is fine. We will retry later. + // Adoption is only to advise other controllers. We don't rely on it. + glog.V(4).Infof("Unable to adopt Jobs for CronJob %v: %v", nameForLog, err) + } + if sj.Spec.Suspend != nil && *sj.Spec.Suspend { glog.V(4).Infof("Not starting job for %s because it is suspended", nameForLog) return diff --git a/pkg/controller/cronjob/injection.go b/pkg/controller/cronjob/injection.go index ba33de68d8d..ecf1b3c89ce 100644 --- a/pkg/controller/cronjob/injection.go +++ b/pkg/controller/cronjob/injection.go @@ -120,6 +120,7 @@ type fakeJobControl struct { Jobs []batchv1.Job DeleteJobName []string Err error + UpdateJobName []string } var _ jobControlInterface = &fakeJobControl{} @@ -151,6 +152,7 @@ func (f *fakeJobControl) UpdateJob(namespace string, job *batchv1.Job) (*batchv1 if f.Err != nil { return nil, f.Err } + f.UpdateJobName = append(f.UpdateJobName, job.Name) return job, nil } diff --git a/pkg/controller/cronjob/utils.go b/pkg/controller/cronjob/utils.go index c755d3ff8b0..eee83331c8e 100644 --- a/pkg/controller/cronjob/utils.go +++ b/pkg/controller/cronjob/utils.go @@ -28,11 +28,13 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" + utilerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/api/v1/ref" batchv1 "k8s.io/kubernetes/pkg/apis/batch/v1" batchv2alpha1 "k8s.io/kubernetes/pkg/apis/batch/v2alpha1" + "k8s.io/kubernetes/pkg/controller" ) // Utilities for dealing with Jobs and CronJobs and time. @@ -279,3 +281,35 @@ func (o byJobStartTime) Less(i, j int) bool { return (*o[i].Status.StartTime).Before(*o[j].Status.StartTime) } + +// adoptJobs applies missing ControllerRefs to Jobs created by a CronJob. +// +// This should only happen if the Jobs were created by an older version of the +// CronJob controller, since from now on we add ControllerRef upon creation. +// +// CronJob doesn't do actual adoption because it doesn't use label selectors to +// find its Jobs. However, we should apply ControllerRef for potential +// server-side cascading deletion, and to advise other controllers we own these +// objects. +func adoptJobs(sj *batchv2alpha1.CronJob, js []batchv1.Job, jc jobControlInterface) error { + var errs []error + sjControllerRef := *newControllerRef(sj) + for i := range js { + job := &js[i] + controllerRef := controller.GetControllerOf(job) + if controllerRef != nil { + continue + } + job.OwnerReferences = append(job.OwnerReferences, sjControllerRef) + updatedJob, err := jc.UpdateJob(job.Namespace, job) + if err != nil { + // If there's a ResourceVersion or other error, don't bother retrying. + // We will just try again on a subsequent CronJob sync. + errs = append(errs, err) + continue + } + // Save it back to the array for later consumers. + js[i] = *updatedJob + } + return utilerrors.NewAggregate(errs) +} diff --git a/pkg/controller/cronjob/utils_test.go b/pkg/controller/cronjob/utils_test.go index 66ebffdd04e..299393fa1f3 100644 --- a/pkg/controller/cronjob/utils_test.go +++ b/pkg/controller/cronjob/utils_test.go @@ -26,6 +26,7 @@ import ( "k8s.io/kubernetes/pkg/api/v1" batchv1 "k8s.io/kubernetes/pkg/apis/batch/v1" batchv2alpha1 "k8s.io/kubernetes/pkg/apis/batch/v2alpha1" + "k8s.io/kubernetes/pkg/controller" ) func TestGetJobFromTemplate(t *testing.T) { @@ -366,5 +367,30 @@ func TestGetRecentUnmetScheduleTimes(t *testing.T) { t.Errorf("unexpected error") } } - +} + +func TestAdoptJobs(t *testing.T) { + sj := cronJob() + jc := &fakeJobControl{} + jobs := []batchv1.Job{newJob("uid0"), newJob("uid1")} + jobs[0].OwnerReferences = nil + jobs[0].Name = "job0" + jobs[1].OwnerReferences = []metav1.OwnerReference{*newControllerRef(&sj)} + jobs[1].Name = "job1" + + if err := adoptJobs(&sj, jobs, jc); err != nil { + t.Errorf("adoptJobs() error: %v", err) + } + for i := range jobs { + controllerRef := controller.GetControllerOf(&jobs[i]) + if controllerRef == nil { + t.Errorf("Job should have ControllerRef: %#v", jobs[i]) + } + } + if got, want := len(jc.UpdateJobName), 1; got != want { + t.Errorf("len(UpdateJobName) = %v, want %v", got, want) + } + if got, want := jc.UpdateJobName[0], "job0"; got != want { + t.Errorf("UpdateJobName = %v, want %v", got, want) + } }