diff --git a/pkg/controller/cronjob/BUILD b/pkg/controller/cronjob/BUILD index 232d785e87c..94dfb52d41d 100644 --- a/pkg/controller/cronjob/BUILD +++ b/pkg/controller/cronjob/BUILD @@ -24,6 +24,7 @@ go_library( "//pkg/apis/batch/v1:go_default_library", "//pkg/apis/batch/v2alpha1:go_default_library", "//pkg/client/clientset_generated/clientset:go_default_library", + "//pkg/controller:go_default_library", "//pkg/util/metrics:go_default_library", "//vendor/github.com/golang/glog:go_default_library", "//vendor/github.com/robfig/cron:go_default_library", @@ -54,6 +55,7 @@ go_test( "//pkg/api/v1:go_default_library", "//pkg/apis/batch/v1:go_default_library", "//pkg/apis/batch/v2alpha1:go_default_library", + "//pkg/controller:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", "//vendor/k8s.io/client-go/tools/record:go_default_library", diff --git a/pkg/controller/cronjob/cronjob_controller.go b/pkg/controller/cronjob/cronjob_controller.go index 1d7af75f7e6..00b109aae97 100644 --- a/pkg/controller/cronjob/cronjob_controller.go +++ b/pkg/controller/cronjob/cronjob_controller.go @@ -56,6 +56,9 @@ import ( // Utilities for dealing with Jobs and CronJobs and time. +// controllerKind contains the schema.GroupVersionKind for this controller type. +var controllerKind = batchv2alpha1.SchemeGroupVersion.WithKind("CronJob") + type CronJobController struct { kubeClient clientset.Interface jobControl jobControlInterface @@ -102,22 +105,26 @@ func (jm *CronJobController) Run(stopCh <-chan struct{}) { // syncAll lists all the CronJobs and Jobs and reconciles them. func (jm *CronJobController) syncAll() { - sjl, err := jm.kubeClient.BatchV2alpha1().CronJobs(metav1.NamespaceAll).List(metav1.ListOptions{}) - if err != nil { - glog.Errorf("Error listing cronjobs: %v", err) - return - } - sjs := sjl.Items - glog.V(4).Infof("Found %d cronjobs", len(sjs)) - + // List children (Jobs) before parents (CronJob). + // This guarantees that if we see any Job that got orphaned by the GC orphan finalizer, + // we must also see that the parent CronJob has non-nil DeletionTimestamp (see #42639). + // Note that this only works because we are NOT using any caches here. jl, err := jm.kubeClient.BatchV1().Jobs(metav1.NamespaceAll).List(metav1.ListOptions{}) if err != nil { - glog.Errorf("Error listing jobs") + utilruntime.HandleError(fmt.Errorf("can't list Jobs: %v", err)) return } js := jl.Items glog.V(4).Infof("Found %d jobs", len(js)) + sjl, err := jm.kubeClient.BatchV2alpha1().CronJobs(metav1.NamespaceAll).List(metav1.ListOptions{}) + if err != nil { + utilruntime.HandleError(fmt.Errorf("can't list CronJobs: %v", err)) + return + } + sjs := sjl.Items + glog.V(4).Infof("Found %d cronjobs", len(sjs)) + jobsBySj := groupJobsByParent(js) glog.V(4).Infof("Found %d groups", len(jobsBySj)) @@ -238,6 +245,18 @@ func syncOne(sj *batchv2alpha1.CronJob, js []batchv1.Job, now time.Time, jc jobC } *sj = *updatedSJ + if sj.DeletionTimestamp != nil { + // The CronJob is being deleted. + // Don't do anything other than updating status. + return + } + + if err := adoptJobs(sj, js, jc); err != nil { + // This is fine. We will retry later. + // Adoption is only to advise other controllers. We don't rely on it. + glog.V(4).Infof("Unable to adopt Jobs for CronJob %v: %v", nameForLog, err) + } + if sj.Spec.Suspend != nil && *sj.Spec.Suspend { glog.V(4).Infof("Not starting job for %s because it is suspended", nameForLog) return diff --git a/pkg/controller/cronjob/cronjob_controller_test.go b/pkg/controller/cronjob/cronjob_controller_test.go index f3281c8cbf9..d9f9b667c3d 100644 --- a/pkg/controller/cronjob/cronjob_controller_test.go +++ b/pkg/controller/cronjob/cronjob_controller_test.go @@ -29,6 +29,7 @@ import ( "k8s.io/kubernetes/pkg/api/v1" batchv1 "k8s.io/kubernetes/pkg/apis/batch/v1" batchv2alpha1 "k8s.io/kubernetes/pkg/apis/batch/v2alpha1" + "k8s.io/kubernetes/pkg/controller" ) // schedule is hourly on the hour @@ -289,6 +290,29 @@ func TestSyncOne_RunOrNot(t *testing.T) { if len(jc.Jobs) != expectedCreates { t.Errorf("%s: expected %d job started, actually %v", name, expectedCreates, len(jc.Jobs)) } + for i := range jc.Jobs { + job := &jc.Jobs[i] + controllerRef := controller.GetControllerOf(job) + if controllerRef == nil { + t.Errorf("%s: expected job to have ControllerRef: %#v", name, job) + } else { + if got, want := controllerRef.APIVersion, "batch/v2alpha1"; got != want { + t.Errorf("%s: controllerRef.APIVersion = %q, want %q", name, got, want) + } + if got, want := controllerRef.Kind, "CronJob"; got != want { + t.Errorf("%s: controllerRef.Kind = %q, want %q", name, got, want) + } + if got, want := controllerRef.Name, sj.Name; got != want { + t.Errorf("%s: controllerRef.Name = %q, want %q", name, got, want) + } + if got, want := controllerRef.UID, sj.UID; got != want { + t.Errorf("%s: controllerRef.UID = %q, want %q", name, got, want) + } + if controllerRef.Controller == nil || *controllerRef.Controller != true { + t.Errorf("%s: controllerRef.Controller is not set to true", name) + } + } + } expectedDeletes := 0 if tc.expectDelete { @@ -583,58 +607,61 @@ func TestSyncOne_Status(t *testing.T) { now time.Time hasUnexpectedJob bool hasMissingJob bool + beingDeleted bool // expectations expectCreate bool expectDelete bool }{ - "never ran, not time, A": {A, F, onTheHour, noDead, F, F, justBeforeTheHour(), F, F, F, F}, - "never ran, not time, F": {f, F, onTheHour, noDead, F, F, justBeforeTheHour(), F, F, F, F}, - "never ran, not time, R": {R, F, onTheHour, noDead, F, F, justBeforeTheHour(), F, F, F, F}, - "never ran, is time, A": {A, F, onTheHour, noDead, F, F, justAfterTheHour(), F, F, T, F}, - "never ran, is time, F": {f, F, onTheHour, noDead, F, F, justAfterTheHour(), F, F, T, F}, - "never ran, is time, R": {R, F, onTheHour, noDead, F, F, justAfterTheHour(), F, F, T, F}, - "never ran, is time, suspended": {A, T, onTheHour, noDead, F, F, justAfterTheHour(), F, F, F, F}, - "never ran, is time, past deadline": {A, F, onTheHour, shortDead, F, F, justAfterTheHour(), F, F, F, F}, - "never ran, is time, not past deadline": {A, F, onTheHour, longDead, F, F, justAfterTheHour(), F, F, T, F}, + "never ran, not time, A": {A, F, onTheHour, noDead, F, F, justBeforeTheHour(), F, F, F, F, F}, + "never ran, not time, F": {f, F, onTheHour, noDead, F, F, justBeforeTheHour(), F, F, F, F, F}, + "never ran, not time, R": {R, F, onTheHour, noDead, F, F, justBeforeTheHour(), F, F, F, F, F}, + "never ran, is time, A": {A, F, onTheHour, noDead, F, F, justAfterTheHour(), F, F, F, T, F}, + "never ran, is time, F": {f, F, onTheHour, noDead, F, F, justAfterTheHour(), F, F, F, T, F}, + "never ran, is time, R": {R, F, onTheHour, noDead, F, F, justAfterTheHour(), F, F, F, T, F}, + "never ran, is time, deleting": {A, F, onTheHour, noDead, F, F, justAfterTheHour(), F, F, T, F, F}, + "never ran, is time, suspended": {A, T, onTheHour, noDead, F, F, justAfterTheHour(), F, F, F, F, F}, + "never ran, is time, past deadline": {A, F, onTheHour, shortDead, F, F, justAfterTheHour(), F, F, F, F, F}, + "never ran, is time, not past deadline": {A, F, onTheHour, longDead, F, F, justAfterTheHour(), F, F, F, T, F}, - "prev ran but done, not time, A": {A, F, onTheHour, noDead, T, F, justBeforeTheHour(), F, F, F, F}, - "prev ran but done, not time, finished job, A": {A, F, onTheHour, noDead, T, T, justBeforeTheHour(), F, F, F, F}, - "prev ran but done, not time, unexpected job, A": {A, F, onTheHour, noDead, T, F, justBeforeTheHour(), T, F, F, F}, - "prev ran but done, not time, missing job, A": {A, F, onTheHour, noDead, T, F, justBeforeTheHour(), F, T, F, F}, - "prev ran but done, not time, missing job, unexpected job, A": {A, F, onTheHour, noDead, T, F, justBeforeTheHour(), T, T, F, F}, - "prev ran but done, not time, finished job, unexpected job, A": {A, F, onTheHour, noDead, T, T, justBeforeTheHour(), T, F, F, F}, - "prev ran but done, not time, finished job, missing job, A": {A, F, onTheHour, noDead, T, T, justBeforeTheHour(), F, T, F, F}, - "prev ran but done, not time, finished job, missing job, unexpected job, A": {A, F, onTheHour, noDead, T, T, justBeforeTheHour(), T, T, F, F}, - "prev ran but done, not time, finished job, F": {f, F, onTheHour, noDead, T, T, justBeforeTheHour(), F, F, F, F}, - "prev ran but done, not time, missing job, F": {f, F, onTheHour, noDead, T, F, justBeforeTheHour(), F, T, F, F}, - "prev ran but done, not time, finished job, missing job, F": {f, F, onTheHour, noDead, T, T, justBeforeTheHour(), F, T, F, F}, - "prev ran but done, not time, unexpected job, R": {R, F, onTheHour, noDead, T, F, justBeforeTheHour(), T, F, F, F}, + "prev ran but done, not time, A": {A, F, onTheHour, noDead, T, F, justBeforeTheHour(), F, F, F, F, F}, + "prev ran but done, not time, finished job, A": {A, F, onTheHour, noDead, T, T, justBeforeTheHour(), F, F, F, F, F}, + "prev ran but done, not time, unexpected job, A": {A, F, onTheHour, noDead, T, F, justBeforeTheHour(), T, F, F, F, F}, + "prev ran but done, not time, missing job, A": {A, F, onTheHour, noDead, T, F, justBeforeTheHour(), F, T, F, F, F}, + "prev ran but done, not time, missing job, unexpected job, A": {A, F, onTheHour, noDead, T, F, justBeforeTheHour(), T, T, F, F, F}, + "prev ran but done, not time, finished job, unexpected job, A": {A, F, onTheHour, noDead, T, T, justBeforeTheHour(), T, F, F, F, F}, + "prev ran but done, not time, finished job, missing job, A": {A, F, onTheHour, noDead, T, T, justBeforeTheHour(), F, T, F, F, F}, + "prev ran but done, not time, finished job, missing job, unexpected job, A": {A, F, onTheHour, noDead, T, T, justBeforeTheHour(), T, T, F, F, F}, + "prev ran but done, not time, finished job, F": {f, F, onTheHour, noDead, T, T, justBeforeTheHour(), F, F, F, F, F}, + "prev ran but done, not time, missing job, F": {f, F, onTheHour, noDead, T, F, justBeforeTheHour(), F, T, F, F, F}, + "prev ran but done, not time, finished job, missing job, F": {f, F, onTheHour, noDead, T, T, justBeforeTheHour(), F, T, F, F, F}, + "prev ran but done, not time, unexpected job, R": {R, F, onTheHour, noDead, T, F, justBeforeTheHour(), T, F, F, F, F}, - "prev ran but done, is time, A": {A, F, onTheHour, noDead, T, F, justAfterTheHour(), F, F, T, F}, - "prev ran but done, is time, finished job, A": {A, F, onTheHour, noDead, T, T, justAfterTheHour(), F, F, T, F}, - "prev ran but done, is time, unexpected job, A": {A, F, onTheHour, noDead, T, F, justAfterTheHour(), T, F, T, F}, - "prev ran but done, is time, finished job, unexpected job, A": {A, F, onTheHour, noDead, T, T, justAfterTheHour(), T, F, T, F}, - "prev ran but done, is time, F": {f, F, onTheHour, noDead, T, F, justAfterTheHour(), F, F, T, F}, - "prev ran but done, is time, finished job, F": {f, F, onTheHour, noDead, T, T, justAfterTheHour(), F, F, T, F}, - "prev ran but done, is time, unexpected job, F": {f, F, onTheHour, noDead, T, F, justAfterTheHour(), T, F, T, F}, - "prev ran but done, is time, finished job, unexpected job, F": {f, F, onTheHour, noDead, T, T, justAfterTheHour(), T, F, T, F}, - "prev ran but done, is time, R": {R, F, onTheHour, noDead, T, F, justAfterTheHour(), F, F, T, F}, - "prev ran but done, is time, finished job, R": {R, F, onTheHour, noDead, T, T, justAfterTheHour(), F, F, T, F}, - "prev ran but done, is time, unexpected job, R": {R, F, onTheHour, noDead, T, F, justAfterTheHour(), T, F, T, F}, - "prev ran but done, is time, finished job, unexpected job, R": {R, F, onTheHour, noDead, T, T, justAfterTheHour(), T, F, T, F}, - "prev ran but done, is time, suspended": {A, T, onTheHour, noDead, T, F, justAfterTheHour(), F, F, F, F}, - "prev ran but done, is time, finished job, suspended": {A, T, onTheHour, noDead, T, T, justAfterTheHour(), F, F, F, F}, - "prev ran but done, is time, unexpected job, suspended": {A, T, onTheHour, noDead, T, F, justAfterTheHour(), T, F, F, F}, - "prev ran but done, is time, finished job, unexpected job, suspended": {A, T, onTheHour, noDead, T, T, justAfterTheHour(), T, F, F, F}, - "prev ran but done, is time, past deadline": {A, F, onTheHour, shortDead, T, F, justAfterTheHour(), F, F, F, F}, - "prev ran but done, is time, finished job, past deadline": {A, F, onTheHour, shortDead, T, T, justAfterTheHour(), F, F, F, F}, - "prev ran but done, is time, unexpected job, past deadline": {A, F, onTheHour, shortDead, T, F, justAfterTheHour(), T, F, F, F}, - "prev ran but done, is time, finished job, unexpected job, past deadline": {A, F, onTheHour, shortDead, T, T, justAfterTheHour(), T, F, F, F}, - "prev ran but done, is time, not past deadline": {A, F, onTheHour, longDead, T, F, justAfterTheHour(), F, F, T, F}, - "prev ran but done, is time, finished job, not past deadline": {A, F, onTheHour, longDead, T, T, justAfterTheHour(), F, F, T, F}, - "prev ran but done, is time, unexpected job, not past deadline": {A, F, onTheHour, longDead, T, F, justAfterTheHour(), T, F, T, F}, - "prev ran but done, is time, finished job, unexpected job, not past deadline": {A, F, onTheHour, longDead, T, T, justAfterTheHour(), T, F, T, F}, + "prev ran but done, is time, A": {A, F, onTheHour, noDead, T, F, justAfterTheHour(), F, F, F, T, F}, + "prev ran but done, is time, finished job, A": {A, F, onTheHour, noDead, T, T, justAfterTheHour(), F, F, F, T, F}, + "prev ran but done, is time, unexpected job, A": {A, F, onTheHour, noDead, T, F, justAfterTheHour(), T, F, F, T, F}, + "prev ran but done, is time, finished job, unexpected job, A": {A, F, onTheHour, noDead, T, T, justAfterTheHour(), T, F, F, T, F}, + "prev ran but done, is time, F": {f, F, onTheHour, noDead, T, F, justAfterTheHour(), F, F, F, T, F}, + "prev ran but done, is time, finished job, F": {f, F, onTheHour, noDead, T, T, justAfterTheHour(), F, F, F, T, F}, + "prev ran but done, is time, unexpected job, F": {f, F, onTheHour, noDead, T, F, justAfterTheHour(), T, F, F, T, F}, + "prev ran but done, is time, finished job, unexpected job, F": {f, F, onTheHour, noDead, T, T, justAfterTheHour(), T, F, F, T, F}, + "prev ran but done, is time, R": {R, F, onTheHour, noDead, T, F, justAfterTheHour(), F, F, F, T, F}, + "prev ran but done, is time, finished job, R": {R, F, onTheHour, noDead, T, T, justAfterTheHour(), F, F, F, T, F}, + "prev ran but done, is time, unexpected job, R": {R, F, onTheHour, noDead, T, F, justAfterTheHour(), T, F, F, T, F}, + "prev ran but done, is time, finished job, unexpected job, R": {R, F, onTheHour, noDead, T, T, justAfterTheHour(), T, F, F, T, F}, + "prev ran but done, is time, deleting": {A, F, onTheHour, noDead, T, F, justAfterTheHour(), F, F, T, F, F}, + "prev ran but done, is time, suspended": {A, T, onTheHour, noDead, T, F, justAfterTheHour(), F, F, F, F, F}, + "prev ran but done, is time, finished job, suspended": {A, T, onTheHour, noDead, T, T, justAfterTheHour(), F, F, F, F, F}, + "prev ran but done, is time, unexpected job, suspended": {A, T, onTheHour, noDead, T, F, justAfterTheHour(), T, F, F, F, F}, + "prev ran but done, is time, finished job, unexpected job, suspended": {A, T, onTheHour, noDead, T, T, justAfterTheHour(), T, F, F, F, F}, + "prev ran but done, is time, past deadline": {A, F, onTheHour, shortDead, T, F, justAfterTheHour(), F, F, F, F, F}, + "prev ran but done, is time, finished job, past deadline": {A, F, onTheHour, shortDead, T, T, justAfterTheHour(), F, F, F, F, F}, + "prev ran but done, is time, unexpected job, past deadline": {A, F, onTheHour, shortDead, T, F, justAfterTheHour(), T, F, F, F, F}, + "prev ran but done, is time, finished job, unexpected job, past deadline": {A, F, onTheHour, shortDead, T, T, justAfterTheHour(), T, F, F, F, F}, + "prev ran but done, is time, not past deadline": {A, F, onTheHour, longDead, T, F, justAfterTheHour(), F, F, F, T, F}, + "prev ran but done, is time, finished job, not past deadline": {A, F, onTheHour, longDead, T, T, justAfterTheHour(), F, F, F, T, F}, + "prev ran but done, is time, unexpected job, not past deadline": {A, F, onTheHour, longDead, T, F, justAfterTheHour(), T, F, F, T, F}, + "prev ran but done, is time, finished job, unexpected job, not past deadline": {A, F, onTheHour, longDead, T, T, justAfterTheHour(), T, F, F, T, F}, } for name, tc := range testCases { @@ -674,6 +701,10 @@ func TestSyncOne_Status(t *testing.T) { } sj.Status.Active = append(sj.Status.Active, *ref) } + if tc.beingDeleted { + timestamp := metav1.NewTime(tc.now) + sj.DeletionTimestamp = ×tamp + } jc := &fakeJobControl{} sjc := &fakeSJControl{} diff --git a/pkg/controller/cronjob/injection.go b/pkg/controller/cronjob/injection.go index ba33de68d8d..d0261377b35 100644 --- a/pkg/controller/cronjob/injection.go +++ b/pkg/controller/cronjob/injection.go @@ -22,6 +22,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/record" "k8s.io/kubernetes/pkg/api/v1" batchv1 "k8s.io/kubernetes/pkg/apis/batch/v1" @@ -63,13 +64,15 @@ func (c *fakeSJControl) UpdateStatus(sj *batchv2alpha1.CronJob) (*batchv2alpha1. // jobControlInterface is an interface that knows how to add or delete jobs // created as an interface to allow testing. type jobControlInterface interface { - // GetJob retrieves a job + // GetJob retrieves a Job. GetJob(namespace, name string) (*batchv1.Job, error) - // CreateJob creates new jobs according to the spec + // CreateJob creates new Jobs according to the spec. CreateJob(namespace string, job *batchv1.Job) (*batchv1.Job, error) - // UpdateJob updates a job + // UpdateJob updates a Job. UpdateJob(namespace string, job *batchv1.Job) (*batchv1.Job, error) - // DeleteJob deletes the job identified by name. + // PatchJob patches a Job. + PatchJob(namespace string, name string, pt types.PatchType, data []byte, subresources ...string) (*batchv1.Job, error) + // DeleteJob deletes the Job identified by name. // TODO: delete by UID? DeleteJob(namespace string, name string) error } @@ -106,6 +109,10 @@ func (r realJobControl) UpdateJob(namespace string, job *batchv1.Job) (*batchv1. return r.KubeClient.BatchV1().Jobs(namespace).Update(job) } +func (r realJobControl) PatchJob(namespace string, name string, pt types.PatchType, data []byte, subresources ...string) (*batchv1.Job, error) { + return r.KubeClient.BatchV1().Jobs(namespace).Patch(name, pt, data, subresources...) +} + func (r realJobControl) CreateJob(namespace string, job *batchv1.Job) (*batchv1.Job, error) { return r.KubeClient.BatchV1().Jobs(namespace).Create(job) } @@ -120,6 +127,9 @@ type fakeJobControl struct { Jobs []batchv1.Job DeleteJobName []string Err error + UpdateJobName []string + PatchJobName []string + Patches [][]byte } var _ jobControlInterface = &fakeJobControl{} @@ -151,9 +161,22 @@ func (f *fakeJobControl) UpdateJob(namespace string, job *batchv1.Job) (*batchv1 if f.Err != nil { return nil, f.Err } + f.UpdateJobName = append(f.UpdateJobName, job.Name) return job, nil } +func (f *fakeJobControl) PatchJob(namespace string, name string, pt types.PatchType, data []byte, subresources ...string) (*batchv1.Job, error) { + f.Lock() + defer f.Unlock() + if f.Err != nil { + return nil, f.Err + } + f.PatchJobName = append(f.PatchJobName, name) + f.Patches = append(f.Patches, data) + // We don't have anything to return. Just return something non-nil. + return &batchv1.Job{}, nil +} + func (f *fakeJobControl) DeleteJob(namespace string, name string) error { f.Lock() defer f.Unlock() diff --git a/pkg/controller/cronjob/utils.go b/pkg/controller/cronjob/utils.go index 4d59ab5b3bd..f456c62ad6d 100644 --- a/pkg/controller/cronjob/utils.go +++ b/pkg/controller/cronjob/utils.go @@ -28,11 +28,13 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" + utilerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/api/v1/ref" batchv1 "k8s.io/kubernetes/pkg/apis/batch/v1" batchv2alpha1 "k8s.io/kubernetes/pkg/apis/batch/v2alpha1" + "k8s.io/kubernetes/pkg/controller" ) // Utilities for dealing with Jobs and CronJobs and time. @@ -180,6 +182,19 @@ func getRecentUnmetScheduleTimes(sj batchv2alpha1.CronJob, now time.Time) ([]tim return starts, nil } +func newControllerRef(sj *batchv2alpha1.CronJob) *metav1.OwnerReference { + blockOwnerDeletion := true + isController := true + return &metav1.OwnerReference{ + APIVersion: controllerKind.GroupVersion().String(), + Kind: controllerKind.Kind, + Name: sj.Name, + UID: sj.UID, + BlockOwnerDeletion: &blockOwnerDeletion, + Controller: &isController, + } +} + // XXX unit test this // getJobFromTemplate makes a Job from a CronJob @@ -199,9 +214,10 @@ func getJobFromTemplate(sj *batchv2alpha1.CronJob, scheduledTime time.Time) (*ba job := &batchv1.Job{ ObjectMeta: metav1.ObjectMeta{ - Labels: labels, - Annotations: annotations, - Name: name, + Labels: labels, + Annotations: annotations, + Name: name, + OwnerReferences: []metav1.OwnerReference{*newControllerRef(sj)}, }, } if err := api.Scheme.Convert(&sj.Spec.JobTemplate.Spec, &job.Spec, nil); err != nil { @@ -267,3 +283,41 @@ func (o byJobStartTime) Less(i, j int) bool { return (*o[i].Status.StartTime).Before(*o[j].Status.StartTime) } + +// adoptJobs applies missing ControllerRefs to Jobs created by a CronJob. +// +// This should only happen if the Jobs were created by an older version of the +// CronJob controller, since from now on we add ControllerRef upon creation. +// +// CronJob doesn't do actual adoption because it doesn't use label selectors to +// find its Jobs. However, we should apply ControllerRef for potential +// server-side cascading deletion, and to advise other controllers we own these +// objects. +func adoptJobs(sj *batchv2alpha1.CronJob, js []batchv1.Job, jc jobControlInterface) error { + var errs []error + controllerRef := newControllerRef(sj) + controllerRefJSON, err := json.Marshal(controllerRef) + if err != nil { + return fmt.Errorf("can't adopt Jobs: failed to marshal ControllerRef %#v: %v", controllerRef, err) + } + + for i := range js { + job := &js[i] + controllerRef := controller.GetControllerOf(job) + if controllerRef != nil { + continue + } + controllerRefPatch := fmt.Sprintf(`{"metadata":{"ownerReferences":[%s],"uid":"%s"}}`, + controllerRefJSON, job.UID) + updatedJob, err := jc.PatchJob(job.Namespace, job.Name, types.StrategicMergePatchType, []byte(controllerRefPatch)) + if err != nil { + // If there's a ResourceVersion or other error, don't bother retrying. + // We will just try again on a subsequent CronJob sync. + errs = append(errs, err) + continue + } + // Save it back to the array for later consumers. + js[i] = *updatedJob + } + return utilerrors.NewAggregate(errs) +} diff --git a/pkg/controller/cronjob/utils_test.go b/pkg/controller/cronjob/utils_test.go index 66ebffdd04e..a8a439752b6 100644 --- a/pkg/controller/cronjob/utils_test.go +++ b/pkg/controller/cronjob/utils_test.go @@ -17,6 +17,8 @@ limitations under the License. package cronjob import ( + "encoding/json" + "reflect" "strings" "testing" "time" @@ -26,6 +28,7 @@ import ( "k8s.io/kubernetes/pkg/api/v1" batchv1 "k8s.io/kubernetes/pkg/apis/batch/v1" batchv2alpha1 "k8s.io/kubernetes/pkg/apis/batch/v2alpha1" + "k8s.io/kubernetes/pkg/controller" ) func TestGetJobFromTemplate(t *testing.T) { @@ -366,5 +369,35 @@ func TestGetRecentUnmetScheduleTimes(t *testing.T) { t.Errorf("unexpected error") } } - +} + +func TestAdoptJobs(t *testing.T) { + sj := cronJob() + controllerRef := newControllerRef(&sj) + jc := &fakeJobControl{} + jobs := []batchv1.Job{newJob("uid0"), newJob("uid1")} + jobs[0].OwnerReferences = nil + jobs[0].Name = "job0" + jobs[1].OwnerReferences = []metav1.OwnerReference{*controllerRef} + jobs[1].Name = "job1" + + if err := adoptJobs(&sj, jobs, jc); err != nil { + t.Errorf("adoptJobs() error: %v", err) + } + if got, want := len(jc.PatchJobName), 1; got != want { + t.Fatalf("len(PatchJobName) = %v, want %v", got, want) + } + if got, want := jc.PatchJobName[0], "job0"; got != want { + t.Errorf("PatchJobName = %v, want %v", got, want) + } + if got, want := len(jc.Patches), 1; got != want { + t.Fatalf("len(Patches) = %v, want %v", got, want) + } + patch := &batchv1.Job{} + if err := json.Unmarshal(jc.Patches[0], patch); err != nil { + t.Fatalf("Unmarshal error: %v", err) + } + if got, want := controller.GetControllerOf(patch), controllerRef; !reflect.DeepEqual(got, want) { + t.Errorf("ControllerRef = %#v, want %#v", got, want) + } } diff --git a/pkg/registry/batch/cronjob/BUILD b/pkg/registry/batch/cronjob/BUILD index 9c29f3f13ea..81a10e3c559 100644 --- a/pkg/registry/batch/cronjob/BUILD +++ b/pkg/registry/batch/cronjob/BUILD @@ -25,6 +25,7 @@ go_library( "//vendor/k8s.io/apimachinery/pkg/util/validation/field:go_default_library", "//vendor/k8s.io/apiserver/pkg/endpoints/request:go_default_library", "//vendor/k8s.io/apiserver/pkg/registry/generic:go_default_library", + "//vendor/k8s.io/apiserver/pkg/registry/rest:go_default_library", "//vendor/k8s.io/apiserver/pkg/storage:go_default_library", "//vendor/k8s.io/apiserver/pkg/storage/names:go_default_library", ], @@ -41,6 +42,7 @@ go_test( "//pkg/apis/batch:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apiserver/pkg/endpoints/request:go_default_library", + "//vendor/k8s.io/apiserver/pkg/registry/rest:go_default_library", ], ) diff --git a/pkg/registry/batch/cronjob/strategy.go b/pkg/registry/batch/cronjob/strategy.go index 4960a758951..a48d74f51ec 100644 --- a/pkg/registry/batch/cronjob/strategy.go +++ b/pkg/registry/batch/cronjob/strategy.go @@ -25,6 +25,7 @@ import ( "k8s.io/apimachinery/pkg/util/validation/field" genericapirequest "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/registry/generic" + "k8s.io/apiserver/pkg/registry/rest" "k8s.io/apiserver/pkg/storage" "k8s.io/apiserver/pkg/storage/names" "k8s.io/kubernetes/pkg/api" @@ -41,6 +42,12 @@ type scheduledJobStrategy struct { // Strategy is the default logic that applies when creating and updating CronJob objects. var Strategy = scheduledJobStrategy{api.Scheme, names.SimpleNameGenerator} +// DefaultGarbageCollectionPolicy returns Orphan because that was the default +// behavior before the server-side garbage collection was implemented. +func (scheduledJobStrategy) DefaultGarbageCollectionPolicy() rest.GarbageCollectionPolicy { + return rest.OrphanDependents +} + // NamespaceScoped returns true because all scheduled jobs need to be within a namespace. func (scheduledJobStrategy) NamespaceScoped() bool { return true diff --git a/pkg/registry/batch/cronjob/strategy_test.go b/pkg/registry/batch/cronjob/strategy_test.go index 13733d2bc80..42bde0c3541 100644 --- a/pkg/registry/batch/cronjob/strategy_test.go +++ b/pkg/registry/batch/cronjob/strategy_test.go @@ -21,6 +21,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" genericapirequest "k8s.io/apiserver/pkg/endpoints/request" + "k8s.io/apiserver/pkg/registry/rest" "k8s.io/kubernetes/pkg/api" apitesting "k8s.io/kubernetes/pkg/api/testing" "k8s.io/kubernetes/pkg/apis/batch" @@ -92,6 +93,13 @@ func TestCronJobStrategy(t *testing.T) { if len(errs) == 0 { t.Errorf("Expected a validation error") } + + // Make sure we correctly implement the interface. + // Otherwise a typo could silently change the default. + var gcds rest.GarbageCollectionDeleteStrategy = Strategy + if got, want := gcds.DefaultGarbageCollectionPolicy(), rest.OrphanDependents; got != want { + t.Errorf("DefaultGarbageCollectionPolicy() = %#v, want %#v", got, want) + } } func TestCronJobStatusStrategy(t *testing.T) { diff --git a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy.go b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy.go index ee19297dd33..3492336aa02 100644 --- a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy.go +++ b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy.go @@ -72,7 +72,7 @@ func init() { ObjectMeta: metav1.ObjectMeta{Name: saRolePrefix + "cronjob-controller"}, Rules: []rbac.PolicyRule{ rbac.NewRule("get", "list", "watch", "update").Groups(batchGroup).Resources("cronjobs").RuleOrDie(), - rbac.NewRule("get", "list", "watch", "create", "update", "delete").Groups(batchGroup).Resources("jobs").RuleOrDie(), + rbac.NewRule("get", "list", "watch", "create", "update", "delete", "patch").Groups(batchGroup).Resources("jobs").RuleOrDie(), rbac.NewRule("update").Groups(batchGroup).Resources("cronjobs/status").RuleOrDie(), rbac.NewRule("list", "delete").Groups(legacyGroup).Resources("pods").RuleOrDie(), eventsRule(), diff --git a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/testdata/controller-roles.yaml b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/testdata/controller-roles.yaml index e331d7d3f55..a0e420faac1 100644 --- a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/testdata/controller-roles.yaml +++ b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/testdata/controller-roles.yaml @@ -109,6 +109,7 @@ items: - delete - get - list + - patch - update - watch - apiGroups: diff --git a/test/e2e/cronjob.go b/test/e2e/cronjob.go index f28fa3b25da..d6cadef3a71 100644 --- a/test/e2e/cronjob.go +++ b/test/e2e/cronjob.go @@ -33,6 +33,7 @@ import ( batchv1 "k8s.io/kubernetes/pkg/apis/batch/v1" batchv2alpha1 "k8s.io/kubernetes/pkg/apis/batch/v2alpha1" "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" + "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller/job" "k8s.io/kubernetes/pkg/kubectl" "k8s.io/kubernetes/test/e2e/framework" @@ -274,6 +275,53 @@ var _ = framework.KubeDescribe("CronJob", func() { err = deleteCronJob(f.ClientSet, f.Namespace.Name, cronJob.Name) Expect(err).NotTo(HaveOccurred()) }) + + // Adopt Jobs it owns that don't have ControllerRef yet. + // That is, the Jobs were created by a pre-v1.6.0 master. + It("should adopt Jobs it owns that don't have ControllerRef yet", func() { + By("Creating a cronjob") + cronJob := newTestCronJob("adopt", "*/1 * * * ?", batchv2alpha1.ForbidConcurrent, + sleepCommand, nil) + // Replace cronJob with the one returned from Create() so it has the UID. + // Save Kind since it won't be populated in the returned cronJob. + kind := cronJob.Kind + cronJob, err := createCronJob(f.ClientSet, f.Namespace.Name, cronJob) + Expect(err).NotTo(HaveOccurred()) + cronJob.Kind = kind + + By("Ensuring a Job is running") + err = waitForActiveJobs(f.ClientSet, f.Namespace.Name, cronJob.Name, 1) + Expect(err).NotTo(HaveOccurred()) + + By("Orphaning a Job") + jobs, err := f.ClientSet.BatchV1().Jobs(f.Namespace.Name).List(metav1.ListOptions{}) + Expect(err).NotTo(HaveOccurred()) + Expect(jobs.Items).To(HaveLen(1)) + job := jobs.Items[0] + framework.UpdateJobFunc(f.ClientSet, f.Namespace.Name, job.Name, func(job *batchv1.Job) { + job.OwnerReferences = nil + }) + + By("Checking that the CronJob readopts the Job") + Expect(wait.Poll(framework.Poll, cronJobTimeout, func() (bool, error) { + job, err := framework.GetJob(f.ClientSet, f.Namespace.Name, job.Name) + if err != nil { + return false, err + } + controllerRef := controller.GetControllerOf(job) + if controllerRef == nil { + return false, nil + } + if controllerRef.Kind != cronJob.Kind || controllerRef.Name != cronJob.Name || controllerRef.UID != cronJob.UID { + return false, fmt.Errorf("Job has wrong controllerRef: got %v, want %v", controllerRef, cronJob) + } + return true, nil + })).To(Succeed(), "wait for Job %q to be readopted", job.Name) + + By("Removing CronJob") + err = deleteCronJob(f.ClientSet, f.Namespace.Name, cronJob.Name) + Expect(err).NotTo(HaveOccurred()) + }) }) // newTestCronJob returns a cronjob which does one of several testing behaviors. @@ -285,6 +333,9 @@ func newTestCronJob(name, schedule string, concurrencyPolicy batchv2alpha1.Concu ObjectMeta: metav1.ObjectMeta{ Name: name, }, + TypeMeta: metav1.TypeMeta{ + Kind: "CronJob", + }, Spec: batchv2alpha1.CronJobSpec{ Schedule: schedule, ConcurrencyPolicy: concurrencyPolicy, diff --git a/test/e2e/framework/jobs_util.go b/test/e2e/framework/jobs_util.go index 0862e615284..e333f8ebefb 100644 --- a/test/e2e/framework/jobs_util.go +++ b/test/e2e/framework/jobs_util.go @@ -17,8 +17,10 @@ limitations under the License. package framework import ( + "fmt" "time" + "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/wait" @@ -120,6 +122,29 @@ func UpdateJob(c clientset.Interface, ns string, job *batch.Job) (*batch.Job, er return c.Batch().Jobs(ns).Update(job) } +// UpdateJobFunc updates the job object. It retries if there is a conflict, throw out error if +// there is any other errors. name is the job name, updateFn is the function updating the +// job object. +func UpdateJobFunc(c clientset.Interface, ns, name string, updateFn func(job *batch.Job)) { + ExpectNoError(wait.Poll(time.Millisecond*500, time.Second*30, func() (bool, error) { + job, err := GetJob(c, ns, name) + if err != nil { + return false, fmt.Errorf("failed to get pod %q: %v", name, err) + } + updateFn(job) + _, err = UpdateJob(c, ns, job) + if err == nil { + Logf("Successfully updated job %q", name) + return true, nil + } + if errors.IsConflict(err) { + Logf("Conflicting update to job %q, re-get and re-update: %v", name, err) + return false, nil + } + return false, fmt.Errorf("failed to update job %q: %v", name, err) + })) +} + // DeleteJob uses c to delete the Job named name in namespace ns. If the returned error is nil, the Job has been // deleted. func DeleteJob(c clientset.Interface, ns, name string) error { diff --git a/test/e2e/generated_clientset.go b/test/e2e/generated_clientset.go index a7e37c2ae8c..35dba8c9cf9 100644 --- a/test/e2e/generated_clientset.go +++ b/test/e2e/generated_clientset.go @@ -324,7 +324,9 @@ var _ = framework.KubeDescribe("Generated release_1_5 clientset", func() { observeCreation(w) By("deleting the cronJob") - if err := cronJobClient.Delete(cronJob.Name, nil); err != nil { + // Use DeletePropagationBackground so the CronJob is really gone when the call returns. + propagationPolicy := metav1.DeletePropagationBackground + if err := cronJobClient.Delete(cronJob.Name, &metav1.DeleteOptions{PropagationPolicy: &propagationPolicy}); err != nil { framework.Failf("Failed to delete cronJob: %v", err) }