From be1fe95534638cbfdf352981842473d25e5c97d4 Mon Sep 17 00:00:00 2001 From: Anthony Yeh Date: Wed, 19 Apr 2017 15:30:38 -0700 Subject: [PATCH] CronJob: Use PATCH to adopt Jobs. --- pkg/controller/cronjob/injection.go | 29 ++++++++++++++++--- pkg/controller/cronjob/utils.go | 12 ++++++-- pkg/controller/cronjob/utils_test.go | 27 ++++++++++------- .../rbac/bootstrappolicy/controller_policy.go | 2 +- .../testdata/controller-roles.yaml | 1 + 5 files changed, 53 insertions(+), 18 deletions(-) diff --git a/pkg/controller/cronjob/injection.go b/pkg/controller/cronjob/injection.go index ecf1b3c89ce..d0261377b35 100644 --- a/pkg/controller/cronjob/injection.go +++ b/pkg/controller/cronjob/injection.go @@ -22,6 +22,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/record" "k8s.io/kubernetes/pkg/api/v1" batchv1 "k8s.io/kubernetes/pkg/apis/batch/v1" @@ -63,13 +64,15 @@ func (c *fakeSJControl) UpdateStatus(sj *batchv2alpha1.CronJob) (*batchv2alpha1. // jobControlInterface is an interface that knows how to add or delete jobs // created as an interface to allow testing. type jobControlInterface interface { - // GetJob retrieves a job + // GetJob retrieves a Job. GetJob(namespace, name string) (*batchv1.Job, error) - // CreateJob creates new jobs according to the spec + // CreateJob creates new Jobs according to the spec. CreateJob(namespace string, job *batchv1.Job) (*batchv1.Job, error) - // UpdateJob updates a job + // UpdateJob updates a Job. UpdateJob(namespace string, job *batchv1.Job) (*batchv1.Job, error) - // DeleteJob deletes the job identified by name. + // PatchJob patches a Job. + PatchJob(namespace string, name string, pt types.PatchType, data []byte, subresources ...string) (*batchv1.Job, error) + // DeleteJob deletes the Job identified by name. // TODO: delete by UID? DeleteJob(namespace string, name string) error } @@ -106,6 +109,10 @@ func (r realJobControl) UpdateJob(namespace string, job *batchv1.Job) (*batchv1. return r.KubeClient.BatchV1().Jobs(namespace).Update(job) } +func (r realJobControl) PatchJob(namespace string, name string, pt types.PatchType, data []byte, subresources ...string) (*batchv1.Job, error) { + return r.KubeClient.BatchV1().Jobs(namespace).Patch(name, pt, data, subresources...) +} + func (r realJobControl) CreateJob(namespace string, job *batchv1.Job) (*batchv1.Job, error) { return r.KubeClient.BatchV1().Jobs(namespace).Create(job) } @@ -121,6 +128,8 @@ type fakeJobControl struct { DeleteJobName []string Err error UpdateJobName []string + PatchJobName []string + Patches [][]byte } var _ jobControlInterface = &fakeJobControl{} @@ -156,6 +165,18 @@ func (f *fakeJobControl) UpdateJob(namespace string, job *batchv1.Job) (*batchv1 return job, nil } +func (f *fakeJobControl) PatchJob(namespace string, name string, pt types.PatchType, data []byte, subresources ...string) (*batchv1.Job, error) { + f.Lock() + defer f.Unlock() + if f.Err != nil { + return nil, f.Err + } + f.PatchJobName = append(f.PatchJobName, name) + f.Patches = append(f.Patches, data) + // We don't have anything to return. Just return something non-nil. + return &batchv1.Job{}, nil +} + func (f *fakeJobControl) DeleteJob(namespace string, name string) error { f.Lock() defer f.Unlock() diff --git a/pkg/controller/cronjob/utils.go b/pkg/controller/cronjob/utils.go index 77b14d0e18e..f456c62ad6d 100644 --- a/pkg/controller/cronjob/utils.go +++ b/pkg/controller/cronjob/utils.go @@ -295,15 +295,21 @@ func (o byJobStartTime) Less(i, j int) bool { // objects. func adoptJobs(sj *batchv2alpha1.CronJob, js []batchv1.Job, jc jobControlInterface) error { var errs []error - sjControllerRef := *newControllerRef(sj) + controllerRef := newControllerRef(sj) + controllerRefJSON, err := json.Marshal(controllerRef) + if err != nil { + return fmt.Errorf("can't adopt Jobs: failed to marshal ControllerRef %#v: %v", controllerRef, err) + } + for i := range js { job := &js[i] controllerRef := controller.GetControllerOf(job) if controllerRef != nil { continue } - job.OwnerReferences = append(job.OwnerReferences, sjControllerRef) - updatedJob, err := jc.UpdateJob(job.Namespace, job) + controllerRefPatch := fmt.Sprintf(`{"metadata":{"ownerReferences":[%s],"uid":"%s"}}`, + controllerRefJSON, job.UID) + updatedJob, err := jc.PatchJob(job.Namespace, job.Name, types.StrategicMergePatchType, []byte(controllerRefPatch)) if err != nil { // If there's a ResourceVersion or other error, don't bother retrying. // We will just try again on a subsequent CronJob sync. diff --git a/pkg/controller/cronjob/utils_test.go b/pkg/controller/cronjob/utils_test.go index 299393fa1f3..a8a439752b6 100644 --- a/pkg/controller/cronjob/utils_test.go +++ b/pkg/controller/cronjob/utils_test.go @@ -17,6 +17,8 @@ limitations under the License. package cronjob import ( + "encoding/json" + "reflect" "strings" "testing" "time" @@ -371,26 +373,31 @@ func TestGetRecentUnmetScheduleTimes(t *testing.T) { func TestAdoptJobs(t *testing.T) { sj := cronJob() + controllerRef := newControllerRef(&sj) jc := &fakeJobControl{} jobs := []batchv1.Job{newJob("uid0"), newJob("uid1")} jobs[0].OwnerReferences = nil jobs[0].Name = "job0" - jobs[1].OwnerReferences = []metav1.OwnerReference{*newControllerRef(&sj)} + jobs[1].OwnerReferences = []metav1.OwnerReference{*controllerRef} jobs[1].Name = "job1" if err := adoptJobs(&sj, jobs, jc); err != nil { t.Errorf("adoptJobs() error: %v", err) } - for i := range jobs { - controllerRef := controller.GetControllerOf(&jobs[i]) - if controllerRef == nil { - t.Errorf("Job should have ControllerRef: %#v", jobs[i]) - } + if got, want := len(jc.PatchJobName), 1; got != want { + t.Fatalf("len(PatchJobName) = %v, want %v", got, want) } - if got, want := len(jc.UpdateJobName), 1; got != want { - t.Errorf("len(UpdateJobName) = %v, want %v", got, want) + if got, want := jc.PatchJobName[0], "job0"; got != want { + t.Errorf("PatchJobName = %v, want %v", got, want) } - if got, want := jc.UpdateJobName[0], "job0"; got != want { - t.Errorf("UpdateJobName = %v, want %v", got, want) + if got, want := len(jc.Patches), 1; got != want { + t.Fatalf("len(Patches) = %v, want %v", got, want) + } + patch := &batchv1.Job{} + if err := json.Unmarshal(jc.Patches[0], patch); err != nil { + t.Fatalf("Unmarshal error: %v", err) + } + if got, want := controller.GetControllerOf(patch), controllerRef; !reflect.DeepEqual(got, want) { + t.Errorf("ControllerRef = %#v, want %#v", got, want) } } diff --git a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy.go b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy.go index ee19297dd33..3492336aa02 100644 --- a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy.go +++ b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy.go @@ -72,7 +72,7 @@ func init() { ObjectMeta: metav1.ObjectMeta{Name: saRolePrefix + "cronjob-controller"}, Rules: []rbac.PolicyRule{ rbac.NewRule("get", "list", "watch", "update").Groups(batchGroup).Resources("cronjobs").RuleOrDie(), - rbac.NewRule("get", "list", "watch", "create", "update", "delete").Groups(batchGroup).Resources("jobs").RuleOrDie(), + rbac.NewRule("get", "list", "watch", "create", "update", "delete", "patch").Groups(batchGroup).Resources("jobs").RuleOrDie(), rbac.NewRule("update").Groups(batchGroup).Resources("cronjobs/status").RuleOrDie(), rbac.NewRule("list", "delete").Groups(legacyGroup).Resources("pods").RuleOrDie(), eventsRule(), diff --git a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/testdata/controller-roles.yaml b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/testdata/controller-roles.yaml index e331d7d3f55..a0e420faac1 100644 --- a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/testdata/controller-roles.yaml +++ b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/testdata/controller-roles.yaml @@ -109,6 +109,7 @@ items: - delete - get - list + - patch - update - watch - apiGroups: