CronJob: Use PATCH to adopt Jobs.

This commit is contained in:
Anthony Yeh 2017-04-19 15:30:38 -07:00
parent 33d7788793
commit be1fe95534
5 changed files with 53 additions and 18 deletions

View File

@ -22,6 +22,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record" "k8s.io/client-go/tools/record"
"k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/api/v1"
batchv1 "k8s.io/kubernetes/pkg/apis/batch/v1" batchv1 "k8s.io/kubernetes/pkg/apis/batch/v1"
@ -63,13 +64,15 @@ func (c *fakeSJControl) UpdateStatus(sj *batchv2alpha1.CronJob) (*batchv2alpha1.
// jobControlInterface is an interface that knows how to add or delete jobs // jobControlInterface is an interface that knows how to add or delete jobs
// created as an interface to allow testing. // created as an interface to allow testing.
type jobControlInterface interface { type jobControlInterface interface {
// GetJob retrieves a job // GetJob retrieves a Job.
GetJob(namespace, name string) (*batchv1.Job, error) GetJob(namespace, name string) (*batchv1.Job, error)
// CreateJob creates new jobs according to the spec // CreateJob creates new Jobs according to the spec.
CreateJob(namespace string, job *batchv1.Job) (*batchv1.Job, error) CreateJob(namespace string, job *batchv1.Job) (*batchv1.Job, error)
// UpdateJob updates a job // UpdateJob updates a Job.
UpdateJob(namespace string, job *batchv1.Job) (*batchv1.Job, error) UpdateJob(namespace string, job *batchv1.Job) (*batchv1.Job, error)
// DeleteJob deletes the job identified by name. // PatchJob patches a Job.
PatchJob(namespace string, name string, pt types.PatchType, data []byte, subresources ...string) (*batchv1.Job, error)
// DeleteJob deletes the Job identified by name.
// TODO: delete by UID? // TODO: delete by UID?
DeleteJob(namespace string, name string) error DeleteJob(namespace string, name string) error
} }
@ -106,6 +109,10 @@ func (r realJobControl) UpdateJob(namespace string, job *batchv1.Job) (*batchv1.
return r.KubeClient.BatchV1().Jobs(namespace).Update(job) return r.KubeClient.BatchV1().Jobs(namespace).Update(job)
} }
func (r realJobControl) PatchJob(namespace string, name string, pt types.PatchType, data []byte, subresources ...string) (*batchv1.Job, error) {
return r.KubeClient.BatchV1().Jobs(namespace).Patch(name, pt, data, subresources...)
}
func (r realJobControl) CreateJob(namespace string, job *batchv1.Job) (*batchv1.Job, error) { func (r realJobControl) CreateJob(namespace string, job *batchv1.Job) (*batchv1.Job, error) {
return r.KubeClient.BatchV1().Jobs(namespace).Create(job) return r.KubeClient.BatchV1().Jobs(namespace).Create(job)
} }
@ -121,6 +128,8 @@ type fakeJobControl struct {
DeleteJobName []string DeleteJobName []string
Err error Err error
UpdateJobName []string UpdateJobName []string
PatchJobName []string
Patches [][]byte
} }
var _ jobControlInterface = &fakeJobControl{} var _ jobControlInterface = &fakeJobControl{}
@ -156,6 +165,18 @@ func (f *fakeJobControl) UpdateJob(namespace string, job *batchv1.Job) (*batchv1
return job, nil return job, nil
} }
func (f *fakeJobControl) PatchJob(namespace string, name string, pt types.PatchType, data []byte, subresources ...string) (*batchv1.Job, error) {
f.Lock()
defer f.Unlock()
if f.Err != nil {
return nil, f.Err
}
f.PatchJobName = append(f.PatchJobName, name)
f.Patches = append(f.Patches, data)
// We don't have anything to return. Just return something non-nil.
return &batchv1.Job{}, nil
}
func (f *fakeJobControl) DeleteJob(namespace string, name string) error { func (f *fakeJobControl) DeleteJob(namespace string, name string) error {
f.Lock() f.Lock()
defer f.Unlock() defer f.Unlock()

View File

@ -295,15 +295,21 @@ func (o byJobStartTime) Less(i, j int) bool {
// objects. // objects.
func adoptJobs(sj *batchv2alpha1.CronJob, js []batchv1.Job, jc jobControlInterface) error { func adoptJobs(sj *batchv2alpha1.CronJob, js []batchv1.Job, jc jobControlInterface) error {
var errs []error var errs []error
sjControllerRef := *newControllerRef(sj) controllerRef := newControllerRef(sj)
controllerRefJSON, err := json.Marshal(controllerRef)
if err != nil {
return fmt.Errorf("can't adopt Jobs: failed to marshal ControllerRef %#v: %v", controllerRef, err)
}
for i := range js { for i := range js {
job := &js[i] job := &js[i]
controllerRef := controller.GetControllerOf(job) controllerRef := controller.GetControllerOf(job)
if controllerRef != nil { if controllerRef != nil {
continue continue
} }
job.OwnerReferences = append(job.OwnerReferences, sjControllerRef) controllerRefPatch := fmt.Sprintf(`{"metadata":{"ownerReferences":[%s],"uid":"%s"}}`,
updatedJob, err := jc.UpdateJob(job.Namespace, job) controllerRefJSON, job.UID)
updatedJob, err := jc.PatchJob(job.Namespace, job.Name, types.StrategicMergePatchType, []byte(controllerRefPatch))
if err != nil { if err != nil {
// If there's a ResourceVersion or other error, don't bother retrying. // If there's a ResourceVersion or other error, don't bother retrying.
// We will just try again on a subsequent CronJob sync. // We will just try again on a subsequent CronJob sync.

View File

@ -17,6 +17,8 @@ limitations under the License.
package cronjob package cronjob
import ( import (
"encoding/json"
"reflect"
"strings" "strings"
"testing" "testing"
"time" "time"
@ -371,26 +373,31 @@ func TestGetRecentUnmetScheduleTimes(t *testing.T) {
func TestAdoptJobs(t *testing.T) { func TestAdoptJobs(t *testing.T) {
sj := cronJob() sj := cronJob()
controllerRef := newControllerRef(&sj)
jc := &fakeJobControl{} jc := &fakeJobControl{}
jobs := []batchv1.Job{newJob("uid0"), newJob("uid1")} jobs := []batchv1.Job{newJob("uid0"), newJob("uid1")}
jobs[0].OwnerReferences = nil jobs[0].OwnerReferences = nil
jobs[0].Name = "job0" jobs[0].Name = "job0"
jobs[1].OwnerReferences = []metav1.OwnerReference{*newControllerRef(&sj)} jobs[1].OwnerReferences = []metav1.OwnerReference{*controllerRef}
jobs[1].Name = "job1" jobs[1].Name = "job1"
if err := adoptJobs(&sj, jobs, jc); err != nil { if err := adoptJobs(&sj, jobs, jc); err != nil {
t.Errorf("adoptJobs() error: %v", err) t.Errorf("adoptJobs() error: %v", err)
} }
for i := range jobs { if got, want := len(jc.PatchJobName), 1; got != want {
controllerRef := controller.GetControllerOf(&jobs[i]) t.Fatalf("len(PatchJobName) = %v, want %v", got, want)
if controllerRef == nil {
t.Errorf("Job should have ControllerRef: %#v", jobs[i])
}
} }
if got, want := len(jc.UpdateJobName), 1; got != want { if got, want := jc.PatchJobName[0], "job0"; got != want {
t.Errorf("len(UpdateJobName) = %v, want %v", got, want) t.Errorf("PatchJobName = %v, want %v", got, want)
} }
if got, want := jc.UpdateJobName[0], "job0"; got != want { if got, want := len(jc.Patches), 1; got != want {
t.Errorf("UpdateJobName = %v, want %v", got, want) t.Fatalf("len(Patches) = %v, want %v", got, want)
}
patch := &batchv1.Job{}
if err := json.Unmarshal(jc.Patches[0], patch); err != nil {
t.Fatalf("Unmarshal error: %v", err)
}
if got, want := controller.GetControllerOf(patch), controllerRef; !reflect.DeepEqual(got, want) {
t.Errorf("ControllerRef = %#v, want %#v", got, want)
} }
} }

View File

@ -72,7 +72,7 @@ func init() {
ObjectMeta: metav1.ObjectMeta{Name: saRolePrefix + "cronjob-controller"}, ObjectMeta: metav1.ObjectMeta{Name: saRolePrefix + "cronjob-controller"},
Rules: []rbac.PolicyRule{ Rules: []rbac.PolicyRule{
rbac.NewRule("get", "list", "watch", "update").Groups(batchGroup).Resources("cronjobs").RuleOrDie(), rbac.NewRule("get", "list", "watch", "update").Groups(batchGroup).Resources("cronjobs").RuleOrDie(),
rbac.NewRule("get", "list", "watch", "create", "update", "delete").Groups(batchGroup).Resources("jobs").RuleOrDie(), rbac.NewRule("get", "list", "watch", "create", "update", "delete", "patch").Groups(batchGroup).Resources("jobs").RuleOrDie(),
rbac.NewRule("update").Groups(batchGroup).Resources("cronjobs/status").RuleOrDie(), rbac.NewRule("update").Groups(batchGroup).Resources("cronjobs/status").RuleOrDie(),
rbac.NewRule("list", "delete").Groups(legacyGroup).Resources("pods").RuleOrDie(), rbac.NewRule("list", "delete").Groups(legacyGroup).Resources("pods").RuleOrDie(),
eventsRule(), eventsRule(),

View File

@ -109,6 +109,7 @@ items:
- delete - delete
- get - get
- list - list
- patch
- update - update
- watch - watch
- apiGroups: - apiGroups: