Merge pull request #42177 from enisoc/controller-ref-cronjob

Automatic merge from submit-queue (batch tested with PRs 42177, 42176, 44721)

CronJob: Respect ControllerRef

**What this PR does / why we need it**:

This is part of the completion of the [ControllerRef](https://github.com/kubernetes/community/blob/master/contributors/design-proposals/controller-ref.md) proposal. It brings CronJob into compliance with ControllerRef. See the individual commit messages for details.

**Which issue this PR fixes**:

This ensures that other controllers do not fight over control of objects that a CronJob owns.

**Special notes for your reviewer**:

**Release note**:

```release-note
CronJob controller now respects ControllerRef to avoid fighting with other controllers.
```
cc @erictune @kubernetes/sig-apps-pr-reviews
This commit is contained in:
Kubernetes Submit Queue 2017-04-20 12:57:03 -07:00 committed by GitHub
commit f25a657574
14 changed files with 322 additions and 64 deletions

View File

@ -24,6 +24,7 @@ go_library(
"//pkg/apis/batch/v1:go_default_library",
"//pkg/apis/batch/v2alpha1:go_default_library",
"//pkg/client/clientset_generated/clientset:go_default_library",
"//pkg/controller:go_default_library",
"//pkg/util/metrics:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/github.com/robfig/cron:go_default_library",
@ -54,6 +55,7 @@ go_test(
"//pkg/api/v1:go_default_library",
"//pkg/apis/batch/v1:go_default_library",
"//pkg/apis/batch/v2alpha1:go_default_library",
"//pkg/controller:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/client-go/tools/record:go_default_library",

View File

@ -56,6 +56,9 @@ import (
// Utilities for dealing with Jobs and CronJobs and time.
// controllerKind contains the schema.GroupVersionKind for this controller type.
var controllerKind = batchv2alpha1.SchemeGroupVersion.WithKind("CronJob")
type CronJobController struct {
kubeClient clientset.Interface
jobControl jobControlInterface
@ -102,22 +105,26 @@ func (jm *CronJobController) Run(stopCh <-chan struct{}) {
// syncAll lists all the CronJobs and Jobs and reconciles them.
func (jm *CronJobController) syncAll() {
sjl, err := jm.kubeClient.BatchV2alpha1().CronJobs(metav1.NamespaceAll).List(metav1.ListOptions{})
if err != nil {
glog.Errorf("Error listing cronjobs: %v", err)
return
}
sjs := sjl.Items
glog.V(4).Infof("Found %d cronjobs", len(sjs))
// List children (Jobs) before parents (CronJob).
// This guarantees that if we see any Job that got orphaned by the GC orphan finalizer,
// we must also see that the parent CronJob has non-nil DeletionTimestamp (see #42639).
// Note that this only works because we are NOT using any caches here.
jl, err := jm.kubeClient.BatchV1().Jobs(metav1.NamespaceAll).List(metav1.ListOptions{})
if err != nil {
glog.Errorf("Error listing jobs")
utilruntime.HandleError(fmt.Errorf("can't list Jobs: %v", err))
return
}
js := jl.Items
glog.V(4).Infof("Found %d jobs", len(js))
sjl, err := jm.kubeClient.BatchV2alpha1().CronJobs(metav1.NamespaceAll).List(metav1.ListOptions{})
if err != nil {
utilruntime.HandleError(fmt.Errorf("can't list CronJobs: %v", err))
return
}
sjs := sjl.Items
glog.V(4).Infof("Found %d cronjobs", len(sjs))
jobsBySj := groupJobsByParent(js)
glog.V(4).Infof("Found %d groups", len(jobsBySj))
@ -238,6 +245,18 @@ func syncOne(sj *batchv2alpha1.CronJob, js []batchv1.Job, now time.Time, jc jobC
}
*sj = *updatedSJ
if sj.DeletionTimestamp != nil {
// The CronJob is being deleted.
// Don't do anything other than updating status.
return
}
if err := adoptJobs(sj, js, jc); err != nil {
// This is fine. We will retry later.
// Adoption is only to advise other controllers. We don't rely on it.
glog.V(4).Infof("Unable to adopt Jobs for CronJob %v: %v", nameForLog, err)
}
if sj.Spec.Suspend != nil && *sj.Spec.Suspend {
glog.V(4).Infof("Not starting job for %s because it is suspended", nameForLog)
return

View File

@ -29,6 +29,7 @@ import (
"k8s.io/kubernetes/pkg/api/v1"
batchv1 "k8s.io/kubernetes/pkg/apis/batch/v1"
batchv2alpha1 "k8s.io/kubernetes/pkg/apis/batch/v2alpha1"
"k8s.io/kubernetes/pkg/controller"
)
// schedule is hourly on the hour
@ -289,6 +290,29 @@ func TestSyncOne_RunOrNot(t *testing.T) {
if len(jc.Jobs) != expectedCreates {
t.Errorf("%s: expected %d job started, actually %v", name, expectedCreates, len(jc.Jobs))
}
for i := range jc.Jobs {
job := &jc.Jobs[i]
controllerRef := controller.GetControllerOf(job)
if controllerRef == nil {
t.Errorf("%s: expected job to have ControllerRef: %#v", name, job)
} else {
if got, want := controllerRef.APIVersion, "batch/v2alpha1"; got != want {
t.Errorf("%s: controllerRef.APIVersion = %q, want %q", name, got, want)
}
if got, want := controllerRef.Kind, "CronJob"; got != want {
t.Errorf("%s: controllerRef.Kind = %q, want %q", name, got, want)
}
if got, want := controllerRef.Name, sj.Name; got != want {
t.Errorf("%s: controllerRef.Name = %q, want %q", name, got, want)
}
if got, want := controllerRef.UID, sj.UID; got != want {
t.Errorf("%s: controllerRef.UID = %q, want %q", name, got, want)
}
if controllerRef.Controller == nil || *controllerRef.Controller != true {
t.Errorf("%s: controllerRef.Controller is not set to true", name)
}
}
}
expectedDeletes := 0
if tc.expectDelete {
@ -583,58 +607,61 @@ func TestSyncOne_Status(t *testing.T) {
now time.Time
hasUnexpectedJob bool
hasMissingJob bool
beingDeleted bool
// expectations
expectCreate bool
expectDelete bool
}{
"never ran, not time, A": {A, F, onTheHour, noDead, F, F, justBeforeTheHour(), F, F, F, F},
"never ran, not time, F": {f, F, onTheHour, noDead, F, F, justBeforeTheHour(), F, F, F, F},
"never ran, not time, R": {R, F, onTheHour, noDead, F, F, justBeforeTheHour(), F, F, F, F},
"never ran, is time, A": {A, F, onTheHour, noDead, F, F, justAfterTheHour(), F, F, T, F},
"never ran, is time, F": {f, F, onTheHour, noDead, F, F, justAfterTheHour(), F, F, T, F},
"never ran, is time, R": {R, F, onTheHour, noDead, F, F, justAfterTheHour(), F, F, T, F},
"never ran, is time, suspended": {A, T, onTheHour, noDead, F, F, justAfterTheHour(), F, F, F, F},
"never ran, is time, past deadline": {A, F, onTheHour, shortDead, F, F, justAfterTheHour(), F, F, F, F},
"never ran, is time, not past deadline": {A, F, onTheHour, longDead, F, F, justAfterTheHour(), F, F, T, F},
"never ran, not time, A": {A, F, onTheHour, noDead, F, F, justBeforeTheHour(), F, F, F, F, F},
"never ran, not time, F": {f, F, onTheHour, noDead, F, F, justBeforeTheHour(), F, F, F, F, F},
"never ran, not time, R": {R, F, onTheHour, noDead, F, F, justBeforeTheHour(), F, F, F, F, F},
"never ran, is time, A": {A, F, onTheHour, noDead, F, F, justAfterTheHour(), F, F, F, T, F},
"never ran, is time, F": {f, F, onTheHour, noDead, F, F, justAfterTheHour(), F, F, F, T, F},
"never ran, is time, R": {R, F, onTheHour, noDead, F, F, justAfterTheHour(), F, F, F, T, F},
"never ran, is time, deleting": {A, F, onTheHour, noDead, F, F, justAfterTheHour(), F, F, T, F, F},
"never ran, is time, suspended": {A, T, onTheHour, noDead, F, F, justAfterTheHour(), F, F, F, F, F},
"never ran, is time, past deadline": {A, F, onTheHour, shortDead, F, F, justAfterTheHour(), F, F, F, F, F},
"never ran, is time, not past deadline": {A, F, onTheHour, longDead, F, F, justAfterTheHour(), F, F, F, T, F},
"prev ran but done, not time, A": {A, F, onTheHour, noDead, T, F, justBeforeTheHour(), F, F, F, F},
"prev ran but done, not time, finished job, A": {A, F, onTheHour, noDead, T, T, justBeforeTheHour(), F, F, F, F},
"prev ran but done, not time, unexpected job, A": {A, F, onTheHour, noDead, T, F, justBeforeTheHour(), T, F, F, F},
"prev ran but done, not time, missing job, A": {A, F, onTheHour, noDead, T, F, justBeforeTheHour(), F, T, F, F},
"prev ran but done, not time, missing job, unexpected job, A": {A, F, onTheHour, noDead, T, F, justBeforeTheHour(), T, T, F, F},
"prev ran but done, not time, finished job, unexpected job, A": {A, F, onTheHour, noDead, T, T, justBeforeTheHour(), T, F, F, F},
"prev ran but done, not time, finished job, missing job, A": {A, F, onTheHour, noDead, T, T, justBeforeTheHour(), F, T, F, F},
"prev ran but done, not time, finished job, missing job, unexpected job, A": {A, F, onTheHour, noDead, T, T, justBeforeTheHour(), T, T, F, F},
"prev ran but done, not time, finished job, F": {f, F, onTheHour, noDead, T, T, justBeforeTheHour(), F, F, F, F},
"prev ran but done, not time, missing job, F": {f, F, onTheHour, noDead, T, F, justBeforeTheHour(), F, T, F, F},
"prev ran but done, not time, finished job, missing job, F": {f, F, onTheHour, noDead, T, T, justBeforeTheHour(), F, T, F, F},
"prev ran but done, not time, unexpected job, R": {R, F, onTheHour, noDead, T, F, justBeforeTheHour(), T, F, F, F},
"prev ran but done, not time, A": {A, F, onTheHour, noDead, T, F, justBeforeTheHour(), F, F, F, F, F},
"prev ran but done, not time, finished job, A": {A, F, onTheHour, noDead, T, T, justBeforeTheHour(), F, F, F, F, F},
"prev ran but done, not time, unexpected job, A": {A, F, onTheHour, noDead, T, F, justBeforeTheHour(), T, F, F, F, F},
"prev ran but done, not time, missing job, A": {A, F, onTheHour, noDead, T, F, justBeforeTheHour(), F, T, F, F, F},
"prev ran but done, not time, missing job, unexpected job, A": {A, F, onTheHour, noDead, T, F, justBeforeTheHour(), T, T, F, F, F},
"prev ran but done, not time, finished job, unexpected job, A": {A, F, onTheHour, noDead, T, T, justBeforeTheHour(), T, F, F, F, F},
"prev ran but done, not time, finished job, missing job, A": {A, F, onTheHour, noDead, T, T, justBeforeTheHour(), F, T, F, F, F},
"prev ran but done, not time, finished job, missing job, unexpected job, A": {A, F, onTheHour, noDead, T, T, justBeforeTheHour(), T, T, F, F, F},
"prev ran but done, not time, finished job, F": {f, F, onTheHour, noDead, T, T, justBeforeTheHour(), F, F, F, F, F},
"prev ran but done, not time, missing job, F": {f, F, onTheHour, noDead, T, F, justBeforeTheHour(), F, T, F, F, F},
"prev ran but done, not time, finished job, missing job, F": {f, F, onTheHour, noDead, T, T, justBeforeTheHour(), F, T, F, F, F},
"prev ran but done, not time, unexpected job, R": {R, F, onTheHour, noDead, T, F, justBeforeTheHour(), T, F, F, F, F},
"prev ran but done, is time, A": {A, F, onTheHour, noDead, T, F, justAfterTheHour(), F, F, T, F},
"prev ran but done, is time, finished job, A": {A, F, onTheHour, noDead, T, T, justAfterTheHour(), F, F, T, F},
"prev ran but done, is time, unexpected job, A": {A, F, onTheHour, noDead, T, F, justAfterTheHour(), T, F, T, F},
"prev ran but done, is time, finished job, unexpected job, A": {A, F, onTheHour, noDead, T, T, justAfterTheHour(), T, F, T, F},
"prev ran but done, is time, F": {f, F, onTheHour, noDead, T, F, justAfterTheHour(), F, F, T, F},
"prev ran but done, is time, finished job, F": {f, F, onTheHour, noDead, T, T, justAfterTheHour(), F, F, T, F},
"prev ran but done, is time, unexpected job, F": {f, F, onTheHour, noDead, T, F, justAfterTheHour(), T, F, T, F},
"prev ran but done, is time, finished job, unexpected job, F": {f, F, onTheHour, noDead, T, T, justAfterTheHour(), T, F, T, F},
"prev ran but done, is time, R": {R, F, onTheHour, noDead, T, F, justAfterTheHour(), F, F, T, F},
"prev ran but done, is time, finished job, R": {R, F, onTheHour, noDead, T, T, justAfterTheHour(), F, F, T, F},
"prev ran but done, is time, unexpected job, R": {R, F, onTheHour, noDead, T, F, justAfterTheHour(), T, F, T, F},
"prev ran but done, is time, finished job, unexpected job, R": {R, F, onTheHour, noDead, T, T, justAfterTheHour(), T, F, T, F},
"prev ran but done, is time, suspended": {A, T, onTheHour, noDead, T, F, justAfterTheHour(), F, F, F, F},
"prev ran but done, is time, finished job, suspended": {A, T, onTheHour, noDead, T, T, justAfterTheHour(), F, F, F, F},
"prev ran but done, is time, unexpected job, suspended": {A, T, onTheHour, noDead, T, F, justAfterTheHour(), T, F, F, F},
"prev ran but done, is time, finished job, unexpected job, suspended": {A, T, onTheHour, noDead, T, T, justAfterTheHour(), T, F, F, F},
"prev ran but done, is time, past deadline": {A, F, onTheHour, shortDead, T, F, justAfterTheHour(), F, F, F, F},
"prev ran but done, is time, finished job, past deadline": {A, F, onTheHour, shortDead, T, T, justAfterTheHour(), F, F, F, F},
"prev ran but done, is time, unexpected job, past deadline": {A, F, onTheHour, shortDead, T, F, justAfterTheHour(), T, F, F, F},
"prev ran but done, is time, finished job, unexpected job, past deadline": {A, F, onTheHour, shortDead, T, T, justAfterTheHour(), T, F, F, F},
"prev ran but done, is time, not past deadline": {A, F, onTheHour, longDead, T, F, justAfterTheHour(), F, F, T, F},
"prev ran but done, is time, finished job, not past deadline": {A, F, onTheHour, longDead, T, T, justAfterTheHour(), F, F, T, F},
"prev ran but done, is time, unexpected job, not past deadline": {A, F, onTheHour, longDead, T, F, justAfterTheHour(), T, F, T, F},
"prev ran but done, is time, finished job, unexpected job, not past deadline": {A, F, onTheHour, longDead, T, T, justAfterTheHour(), T, F, T, F},
"prev ran but done, is time, A": {A, F, onTheHour, noDead, T, F, justAfterTheHour(), F, F, F, T, F},
"prev ran but done, is time, finished job, A": {A, F, onTheHour, noDead, T, T, justAfterTheHour(), F, F, F, T, F},
"prev ran but done, is time, unexpected job, A": {A, F, onTheHour, noDead, T, F, justAfterTheHour(), T, F, F, T, F},
"prev ran but done, is time, finished job, unexpected job, A": {A, F, onTheHour, noDead, T, T, justAfterTheHour(), T, F, F, T, F},
"prev ran but done, is time, F": {f, F, onTheHour, noDead, T, F, justAfterTheHour(), F, F, F, T, F},
"prev ran but done, is time, finished job, F": {f, F, onTheHour, noDead, T, T, justAfterTheHour(), F, F, F, T, F},
"prev ran but done, is time, unexpected job, F": {f, F, onTheHour, noDead, T, F, justAfterTheHour(), T, F, F, T, F},
"prev ran but done, is time, finished job, unexpected job, F": {f, F, onTheHour, noDead, T, T, justAfterTheHour(), T, F, F, T, F},
"prev ran but done, is time, R": {R, F, onTheHour, noDead, T, F, justAfterTheHour(), F, F, F, T, F},
"prev ran but done, is time, finished job, R": {R, F, onTheHour, noDead, T, T, justAfterTheHour(), F, F, F, T, F},
"prev ran but done, is time, unexpected job, R": {R, F, onTheHour, noDead, T, F, justAfterTheHour(), T, F, F, T, F},
"prev ran but done, is time, finished job, unexpected job, R": {R, F, onTheHour, noDead, T, T, justAfterTheHour(), T, F, F, T, F},
"prev ran but done, is time, deleting": {A, F, onTheHour, noDead, T, F, justAfterTheHour(), F, F, T, F, F},
"prev ran but done, is time, suspended": {A, T, onTheHour, noDead, T, F, justAfterTheHour(), F, F, F, F, F},
"prev ran but done, is time, finished job, suspended": {A, T, onTheHour, noDead, T, T, justAfterTheHour(), F, F, F, F, F},
"prev ran but done, is time, unexpected job, suspended": {A, T, onTheHour, noDead, T, F, justAfterTheHour(), T, F, F, F, F},
"prev ran but done, is time, finished job, unexpected job, suspended": {A, T, onTheHour, noDead, T, T, justAfterTheHour(), T, F, F, F, F},
"prev ran but done, is time, past deadline": {A, F, onTheHour, shortDead, T, F, justAfterTheHour(), F, F, F, F, F},
"prev ran but done, is time, finished job, past deadline": {A, F, onTheHour, shortDead, T, T, justAfterTheHour(), F, F, F, F, F},
"prev ran but done, is time, unexpected job, past deadline": {A, F, onTheHour, shortDead, T, F, justAfterTheHour(), T, F, F, F, F},
"prev ran but done, is time, finished job, unexpected job, past deadline": {A, F, onTheHour, shortDead, T, T, justAfterTheHour(), T, F, F, F, F},
"prev ran but done, is time, not past deadline": {A, F, onTheHour, longDead, T, F, justAfterTheHour(), F, F, F, T, F},
"prev ran but done, is time, finished job, not past deadline": {A, F, onTheHour, longDead, T, T, justAfterTheHour(), F, F, F, T, F},
"prev ran but done, is time, unexpected job, not past deadline": {A, F, onTheHour, longDead, T, F, justAfterTheHour(), T, F, F, T, F},
"prev ran but done, is time, finished job, unexpected job, not past deadline": {A, F, onTheHour, longDead, T, T, justAfterTheHour(), T, F, F, T, F},
}
for name, tc := range testCases {
@ -674,6 +701,10 @@ func TestSyncOne_Status(t *testing.T) {
}
sj.Status.Active = append(sj.Status.Active, *ref)
}
if tc.beingDeleted {
timestamp := metav1.NewTime(tc.now)
sj.DeletionTimestamp = &timestamp
}
jc := &fakeJobControl{}
sjc := &fakeSJControl{}

View File

@ -22,6 +22,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
"k8s.io/kubernetes/pkg/api/v1"
batchv1 "k8s.io/kubernetes/pkg/apis/batch/v1"
@ -63,13 +64,15 @@ func (c *fakeSJControl) UpdateStatus(sj *batchv2alpha1.CronJob) (*batchv2alpha1.
// jobControlInterface is an interface that knows how to add or delete jobs
// created as an interface to allow testing.
type jobControlInterface interface {
// GetJob retrieves a job
// GetJob retrieves a Job.
GetJob(namespace, name string) (*batchv1.Job, error)
// CreateJob creates new jobs according to the spec
// CreateJob creates new Jobs according to the spec.
CreateJob(namespace string, job *batchv1.Job) (*batchv1.Job, error)
// UpdateJob updates a job
// UpdateJob updates a Job.
UpdateJob(namespace string, job *batchv1.Job) (*batchv1.Job, error)
// DeleteJob deletes the job identified by name.
// PatchJob patches a Job.
PatchJob(namespace string, name string, pt types.PatchType, data []byte, subresources ...string) (*batchv1.Job, error)
// DeleteJob deletes the Job identified by name.
// TODO: delete by UID?
DeleteJob(namespace string, name string) error
}
@ -106,6 +109,10 @@ func (r realJobControl) UpdateJob(namespace string, job *batchv1.Job) (*batchv1.
return r.KubeClient.BatchV1().Jobs(namespace).Update(job)
}
func (r realJobControl) PatchJob(namespace string, name string, pt types.PatchType, data []byte, subresources ...string) (*batchv1.Job, error) {
return r.KubeClient.BatchV1().Jobs(namespace).Patch(name, pt, data, subresources...)
}
func (r realJobControl) CreateJob(namespace string, job *batchv1.Job) (*batchv1.Job, error) {
return r.KubeClient.BatchV1().Jobs(namespace).Create(job)
}
@ -120,6 +127,9 @@ type fakeJobControl struct {
Jobs []batchv1.Job
DeleteJobName []string
Err error
UpdateJobName []string
PatchJobName []string
Patches [][]byte
}
var _ jobControlInterface = &fakeJobControl{}
@ -151,9 +161,22 @@ func (f *fakeJobControl) UpdateJob(namespace string, job *batchv1.Job) (*batchv1
if f.Err != nil {
return nil, f.Err
}
f.UpdateJobName = append(f.UpdateJobName, job.Name)
return job, nil
}
func (f *fakeJobControl) PatchJob(namespace string, name string, pt types.PatchType, data []byte, subresources ...string) (*batchv1.Job, error) {
f.Lock()
defer f.Unlock()
if f.Err != nil {
return nil, f.Err
}
f.PatchJobName = append(f.PatchJobName, name)
f.Patches = append(f.Patches, data)
// We don't have anything to return. Just return something non-nil.
return &batchv1.Job{}, nil
}
func (f *fakeJobControl) DeleteJob(namespace string, name string) error {
f.Lock()
defer f.Unlock()

View File

@ -28,11 +28,13 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/api/v1/ref"
batchv1 "k8s.io/kubernetes/pkg/apis/batch/v1"
batchv2alpha1 "k8s.io/kubernetes/pkg/apis/batch/v2alpha1"
"k8s.io/kubernetes/pkg/controller"
)
// Utilities for dealing with Jobs and CronJobs and time.
@ -180,6 +182,19 @@ func getRecentUnmetScheduleTimes(sj batchv2alpha1.CronJob, now time.Time) ([]tim
return starts, nil
}
func newControllerRef(sj *batchv2alpha1.CronJob) *metav1.OwnerReference {
blockOwnerDeletion := true
isController := true
return &metav1.OwnerReference{
APIVersion: controllerKind.GroupVersion().String(),
Kind: controllerKind.Kind,
Name: sj.Name,
UID: sj.UID,
BlockOwnerDeletion: &blockOwnerDeletion,
Controller: &isController,
}
}
// XXX unit test this
// getJobFromTemplate makes a Job from a CronJob
@ -199,9 +214,10 @@ func getJobFromTemplate(sj *batchv2alpha1.CronJob, scheduledTime time.Time) (*ba
job := &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Labels: labels,
Annotations: annotations,
Name: name,
Labels: labels,
Annotations: annotations,
Name: name,
OwnerReferences: []metav1.OwnerReference{*newControllerRef(sj)},
},
}
if err := api.Scheme.Convert(&sj.Spec.JobTemplate.Spec, &job.Spec, nil); err != nil {
@ -267,3 +283,41 @@ func (o byJobStartTime) Less(i, j int) bool {
return (*o[i].Status.StartTime).Before(*o[j].Status.StartTime)
}
// adoptJobs applies missing ControllerRefs to Jobs created by a CronJob.
//
// This should only happen if the Jobs were created by an older version of the
// CronJob controller, since from now on we add ControllerRef upon creation.
//
// CronJob doesn't do actual adoption because it doesn't use label selectors to
// find its Jobs. However, we should apply ControllerRef for potential
// server-side cascading deletion, and to advise other controllers we own these
// objects.
func adoptJobs(sj *batchv2alpha1.CronJob, js []batchv1.Job, jc jobControlInterface) error {
var errs []error
controllerRef := newControllerRef(sj)
controllerRefJSON, err := json.Marshal(controllerRef)
if err != nil {
return fmt.Errorf("can't adopt Jobs: failed to marshal ControllerRef %#v: %v", controllerRef, err)
}
for i := range js {
job := &js[i]
controllerRef := controller.GetControllerOf(job)
if controllerRef != nil {
continue
}
controllerRefPatch := fmt.Sprintf(`{"metadata":{"ownerReferences":[%s],"uid":"%s"}}`,
controllerRefJSON, job.UID)
updatedJob, err := jc.PatchJob(job.Namespace, job.Name, types.StrategicMergePatchType, []byte(controllerRefPatch))
if err != nil {
// If there's a ResourceVersion or other error, don't bother retrying.
// We will just try again on a subsequent CronJob sync.
errs = append(errs, err)
continue
}
// Save it back to the array for later consumers.
js[i] = *updatedJob
}
return utilerrors.NewAggregate(errs)
}

View File

@ -17,6 +17,8 @@ limitations under the License.
package cronjob
import (
"encoding/json"
"reflect"
"strings"
"testing"
"time"
@ -26,6 +28,7 @@ import (
"k8s.io/kubernetes/pkg/api/v1"
batchv1 "k8s.io/kubernetes/pkg/apis/batch/v1"
batchv2alpha1 "k8s.io/kubernetes/pkg/apis/batch/v2alpha1"
"k8s.io/kubernetes/pkg/controller"
)
func TestGetJobFromTemplate(t *testing.T) {
@ -366,5 +369,35 @@ func TestGetRecentUnmetScheduleTimes(t *testing.T) {
t.Errorf("unexpected error")
}
}
}
func TestAdoptJobs(t *testing.T) {
sj := cronJob()
controllerRef := newControllerRef(&sj)
jc := &fakeJobControl{}
jobs := []batchv1.Job{newJob("uid0"), newJob("uid1")}
jobs[0].OwnerReferences = nil
jobs[0].Name = "job0"
jobs[1].OwnerReferences = []metav1.OwnerReference{*controllerRef}
jobs[1].Name = "job1"
if err := adoptJobs(&sj, jobs, jc); err != nil {
t.Errorf("adoptJobs() error: %v", err)
}
if got, want := len(jc.PatchJobName), 1; got != want {
t.Fatalf("len(PatchJobName) = %v, want %v", got, want)
}
if got, want := jc.PatchJobName[0], "job0"; got != want {
t.Errorf("PatchJobName = %v, want %v", got, want)
}
if got, want := len(jc.Patches), 1; got != want {
t.Fatalf("len(Patches) = %v, want %v", got, want)
}
patch := &batchv1.Job{}
if err := json.Unmarshal(jc.Patches[0], patch); err != nil {
t.Fatalf("Unmarshal error: %v", err)
}
if got, want := controller.GetControllerOf(patch), controllerRef; !reflect.DeepEqual(got, want) {
t.Errorf("ControllerRef = %#v, want %#v", got, want)
}
}

View File

@ -25,6 +25,7 @@ go_library(
"//vendor/k8s.io/apimachinery/pkg/util/validation/field:go_default_library",
"//vendor/k8s.io/apiserver/pkg/endpoints/request:go_default_library",
"//vendor/k8s.io/apiserver/pkg/registry/generic:go_default_library",
"//vendor/k8s.io/apiserver/pkg/registry/rest:go_default_library",
"//vendor/k8s.io/apiserver/pkg/storage:go_default_library",
"//vendor/k8s.io/apiserver/pkg/storage/names:go_default_library",
],
@ -41,6 +42,7 @@ go_test(
"//pkg/apis/batch:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apiserver/pkg/endpoints/request:go_default_library",
"//vendor/k8s.io/apiserver/pkg/registry/rest:go_default_library",
],
)

View File

@ -25,6 +25,7 @@ import (
"k8s.io/apimachinery/pkg/util/validation/field"
genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/registry/generic"
"k8s.io/apiserver/pkg/registry/rest"
"k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/names"
"k8s.io/kubernetes/pkg/api"
@ -41,6 +42,12 @@ type scheduledJobStrategy struct {
// Strategy is the default logic that applies when creating and updating CronJob objects.
var Strategy = scheduledJobStrategy{api.Scheme, names.SimpleNameGenerator}
// DefaultGarbageCollectionPolicy returns Orphan because that was the default
// behavior before the server-side garbage collection was implemented.
func (scheduledJobStrategy) DefaultGarbageCollectionPolicy() rest.GarbageCollectionPolicy {
return rest.OrphanDependents
}
// NamespaceScoped returns true because all scheduled jobs need to be within a namespace.
func (scheduledJobStrategy) NamespaceScoped() bool {
return true

View File

@ -21,6 +21,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/registry/rest"
"k8s.io/kubernetes/pkg/api"
apitesting "k8s.io/kubernetes/pkg/api/testing"
"k8s.io/kubernetes/pkg/apis/batch"
@ -92,6 +93,13 @@ func TestCronJobStrategy(t *testing.T) {
if len(errs) == 0 {
t.Errorf("Expected a validation error")
}
// Make sure we correctly implement the interface.
// Otherwise a typo could silently change the default.
var gcds rest.GarbageCollectionDeleteStrategy = Strategy
if got, want := gcds.DefaultGarbageCollectionPolicy(), rest.OrphanDependents; got != want {
t.Errorf("DefaultGarbageCollectionPolicy() = %#v, want %#v", got, want)
}
}
func TestCronJobStatusStrategy(t *testing.T) {

View File

@ -72,7 +72,7 @@ func init() {
ObjectMeta: metav1.ObjectMeta{Name: saRolePrefix + "cronjob-controller"},
Rules: []rbac.PolicyRule{
rbac.NewRule("get", "list", "watch", "update").Groups(batchGroup).Resources("cronjobs").RuleOrDie(),
rbac.NewRule("get", "list", "watch", "create", "update", "delete").Groups(batchGroup).Resources("jobs").RuleOrDie(),
rbac.NewRule("get", "list", "watch", "create", "update", "delete", "patch").Groups(batchGroup).Resources("jobs").RuleOrDie(),
rbac.NewRule("update").Groups(batchGroup).Resources("cronjobs/status").RuleOrDie(),
rbac.NewRule("list", "delete").Groups(legacyGroup).Resources("pods").RuleOrDie(),
eventsRule(),

View File

@ -109,6 +109,7 @@ items:
- delete
- get
- list
- patch
- update
- watch
- apiGroups:

View File

@ -33,6 +33,7 @@ import (
batchv1 "k8s.io/kubernetes/pkg/apis/batch/v1"
batchv2alpha1 "k8s.io/kubernetes/pkg/apis/batch/v2alpha1"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/job"
"k8s.io/kubernetes/pkg/kubectl"
"k8s.io/kubernetes/test/e2e/framework"
@ -274,6 +275,53 @@ var _ = framework.KubeDescribe("CronJob", func() {
err = deleteCronJob(f.ClientSet, f.Namespace.Name, cronJob.Name)
Expect(err).NotTo(HaveOccurred())
})
// Adopt Jobs it owns that don't have ControllerRef yet.
// That is, the Jobs were created by a pre-v1.6.0 master.
It("should adopt Jobs it owns that don't have ControllerRef yet", func() {
By("Creating a cronjob")
cronJob := newTestCronJob("adopt", "*/1 * * * ?", batchv2alpha1.ForbidConcurrent,
sleepCommand, nil)
// Replace cronJob with the one returned from Create() so it has the UID.
// Save Kind since it won't be populated in the returned cronJob.
kind := cronJob.Kind
cronJob, err := createCronJob(f.ClientSet, f.Namespace.Name, cronJob)
Expect(err).NotTo(HaveOccurred())
cronJob.Kind = kind
By("Ensuring a Job is running")
err = waitForActiveJobs(f.ClientSet, f.Namespace.Name, cronJob.Name, 1)
Expect(err).NotTo(HaveOccurred())
By("Orphaning a Job")
jobs, err := f.ClientSet.BatchV1().Jobs(f.Namespace.Name).List(metav1.ListOptions{})
Expect(err).NotTo(HaveOccurred())
Expect(jobs.Items).To(HaveLen(1))
job := jobs.Items[0]
framework.UpdateJobFunc(f.ClientSet, f.Namespace.Name, job.Name, func(job *batchv1.Job) {
job.OwnerReferences = nil
})
By("Checking that the CronJob readopts the Job")
Expect(wait.Poll(framework.Poll, cronJobTimeout, func() (bool, error) {
job, err := framework.GetJob(f.ClientSet, f.Namespace.Name, job.Name)
if err != nil {
return false, err
}
controllerRef := controller.GetControllerOf(job)
if controllerRef == nil {
return false, nil
}
if controllerRef.Kind != cronJob.Kind || controllerRef.Name != cronJob.Name || controllerRef.UID != cronJob.UID {
return false, fmt.Errorf("Job has wrong controllerRef: got %v, want %v", controllerRef, cronJob)
}
return true, nil
})).To(Succeed(), "wait for Job %q to be readopted", job.Name)
By("Removing CronJob")
err = deleteCronJob(f.ClientSet, f.Namespace.Name, cronJob.Name)
Expect(err).NotTo(HaveOccurred())
})
})
// newTestCronJob returns a cronjob which does one of several testing behaviors.
@ -285,6 +333,9 @@ func newTestCronJob(name, schedule string, concurrencyPolicy batchv2alpha1.Concu
ObjectMeta: metav1.ObjectMeta{
Name: name,
},
TypeMeta: metav1.TypeMeta{
Kind: "CronJob",
},
Spec: batchv2alpha1.CronJobSpec{
Schedule: schedule,
ConcurrencyPolicy: concurrencyPolicy,

View File

@ -17,8 +17,10 @@ limitations under the License.
package framework
import (
"fmt"
"time"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/wait"
@ -120,6 +122,29 @@ func UpdateJob(c clientset.Interface, ns string, job *batch.Job) (*batch.Job, er
return c.Batch().Jobs(ns).Update(job)
}
// UpdateJobFunc updates the job object. It retries if there is a conflict, throw out error if
// there is any other errors. name is the job name, updateFn is the function updating the
// job object.
func UpdateJobFunc(c clientset.Interface, ns, name string, updateFn func(job *batch.Job)) {
ExpectNoError(wait.Poll(time.Millisecond*500, time.Second*30, func() (bool, error) {
job, err := GetJob(c, ns, name)
if err != nil {
return false, fmt.Errorf("failed to get pod %q: %v", name, err)
}
updateFn(job)
_, err = UpdateJob(c, ns, job)
if err == nil {
Logf("Successfully updated job %q", name)
return true, nil
}
if errors.IsConflict(err) {
Logf("Conflicting update to job %q, re-get and re-update: %v", name, err)
return false, nil
}
return false, fmt.Errorf("failed to update job %q: %v", name, err)
}))
}
// DeleteJob uses c to delete the Job named name in namespace ns. If the returned error is nil, the Job has been
// deleted.
func DeleteJob(c clientset.Interface, ns, name string) error {

View File

@ -324,7 +324,9 @@ var _ = framework.KubeDescribe("Generated release_1_5 clientset", func() {
observeCreation(w)
By("deleting the cronJob")
if err := cronJobClient.Delete(cronJob.Name, nil); err != nil {
// Use DeletePropagationBackground so the CronJob is really gone when the call returns.
propagationPolicy := metav1.DeletePropagationBackground
if err := cronJobClient.Delete(cronJob.Name, &metav1.DeleteOptions{PropagationPolicy: &propagationPolicy}); err != nil {
framework.Failf("Failed to delete cronJob: %v", err)
}