From d446930699e8838312de3924377597ebe47056a4 Mon Sep 17 00:00:00 2001 From: Maciej Szulik Date: Thu, 11 Aug 2016 16:59:40 +0200 Subject: [PATCH] Remove pods along with jobs when Replace ConcurrentPolicy is set --- pkg/controller/scheduledjob/controller.go | 48 ++++++++++- .../scheduledjob/controller_test.go | 25 +++++- pkg/controller/scheduledjob/injection.go | 85 +++++++++++++++++++ 3 files changed, 151 insertions(+), 7 deletions(-) diff --git a/pkg/controller/scheduledjob/controller.go b/pkg/controller/scheduledjob/controller.go index 0ba32b4bc46..4c9c0322d88 100644 --- a/pkg/controller/scheduledjob/controller.go +++ b/pkg/controller/scheduledjob/controller.go @@ -35,12 +35,14 @@ import ( "github.com/golang/glog" "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/apis/batch" "k8s.io/kubernetes/pkg/client/record" client "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/controller/job" "k8s.io/kubernetes/pkg/runtime" + utilerrors "k8s.io/kubernetes/pkg/util/errors" "k8s.io/kubernetes/pkg/util/metrics" utilruntime "k8s.io/kubernetes/pkg/util/runtime" "k8s.io/kubernetes/pkg/util/wait" @@ -52,6 +54,7 @@ type ScheduledJobController struct { kubeClient *client.Client jobControl jobControlInterface sjControl sjControlInterface + podControl podControlInterface recorder record.EventRecorder } @@ -69,6 +72,7 @@ func NewScheduledJobController(kubeClient *client.Client) *ScheduledJobControlle kubeClient: kubeClient, jobControl: realJobControl{KubeClient: kubeClient}, sjControl: &realSJControl{KubeClient: kubeClient}, + podControl: &realPodControl{KubeClient: kubeClient}, recorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "scheduled-job-controller"}), } @@ -112,7 +116,7 @@ func (jm *ScheduledJobController) SyncAll() { glog.Infof("Found %d groups", len(jobsBySj)) for _, sj := range sjs { - SyncOne(sj, jobsBySj[sj.UID], time.Now(), jm.jobControl, jm.sjControl, jm.recorder) + SyncOne(sj, jobsBySj[sj.UID], time.Now(), jm.jobControl, jm.sjControl, jm.podControl, jm.recorder) } } @@ -120,7 +124,7 @@ func (jm *ScheduledJobController) SyncAll() { // All known jobs created by "sj" should be included in "js". // The current time is passed in to facilitate testing. // It has no receiver, to facilitate testing. -func SyncOne(sj batch.ScheduledJob, js []batch.Job, now time.Time, jc jobControlInterface, sjc sjControlInterface, recorder record.EventRecorder) { +func SyncOne(sj batch.ScheduledJob, js []batch.Job, now time.Time, jc jobControlInterface, sjc sjControlInterface, pc podControlInterface, recorder record.EventRecorder) { nameForLog := fmt.Sprintf("%s/%s", sj.Namespace, sj.Name) for _, j := range js { @@ -199,8 +203,46 @@ func SyncOne(sj batch.ScheduledJob, js []batch.Job, now time.Time, jc jobControl } if sj.Spec.ConcurrencyPolicy == batch.ReplaceConcurrent { for _, j := range sj.Status.Active { + // TODO: this should be replaced with server side job deletion + // currently this mimics JobReaper from pkg/kubectl/stop.go glog.V(4).Infof("Deleting job %s of %s s that was still running at next scheduled start time", j.Name, nameForLog) - if err := jc.DeleteJob(j.Namespace, j.Name); err != nil { + job, err := jc.GetJob(j.Namespace, j.Name) + if err != nil { + recorder.Eventf(&sj, api.EventTypeWarning, "FailedGet", "Get job: %v", err) + return + } + // scale job down to 0 + if *job.Spec.Parallelism != 0 { + zero := int32(0) + job.Spec.Parallelism = &zero + job, err = jc.UpdateJob(job.Namespace, job) + if err != nil { + recorder.Eventf(&sj, api.EventTypeWarning, "FailedUpdate", "Update job: %v", err) + return + } + } + // remove all pods... + selector, _ := unversioned.LabelSelectorAsSelector(job.Spec.Selector) + options := api.ListOptions{LabelSelector: selector} + podList, err := pc.ListPods(job.Namespace, options) + if err != nil { + recorder.Eventf(&sj, api.EventTypeWarning, "FailedList", "List job-pods: %v", err) + } + errList := []error{} + for _, pod := range podList.Items { + if err := pc.DeletePod(pod.Namespace, pod.Name); err != nil { + // ignores the error when the pod isn't found + if !errors.IsNotFound(err) { + errList = append(errList, err) + } + } + } + if len(errList) != 0 { + recorder.Eventf(&sj, api.EventTypeWarning, "FailedDelete", "Deleted job-pods: %v", utilerrors.NewAggregate(errList)) + return + } + // ... and the job itself + if err := jc.DeleteJob(job.Namespace, job.Name); err != nil { recorder.Eventf(&sj, api.EventTypeWarning, "FailedDelete", "Deleted job: %v", err) return } diff --git a/pkg/controller/scheduledjob/controller_test.go b/pkg/controller/scheduledjob/controller_test.go index 2f9356c5fda..23c2e75db49 100644 --- a/pkg/controller/scheduledjob/controller_test.go +++ b/pkg/controller/scheduledjob/controller_test.go @@ -97,7 +97,10 @@ func scheduledJob() batch.ScheduledJob { } func jobSpec() batch.JobSpec { + one := int32(1) return batch.JobSpec{ + Parallelism: &one, + Completions: &one, Template: api.PodTemplateSpec{ ObjectMeta: api.ObjectMeta{ Labels: map[string]string{ @@ -196,11 +199,23 @@ func TestSyncOne_RunOrNot(t *testing.T) { sj.Spec.StartingDeadlineSeconds = &tc.deadline } + var ( + job *batch.Job + err error + ) + js := []batch.Job{} if tc.ranPreviously { sj.ObjectMeta.CreationTimestamp = unversioned.Time{Time: justBeforeThePriorHour()} sj.Status.LastScheduleTime = &unversioned.Time{Time: justAfterThePriorHour()} + job, err = getJobFromTemplate(&sj, sj.Status.LastScheduleTime.Time) + if err != nil { + t.Fatalf("Unexpected error creating a job from template: %v", err) + } + job.UID = "1234" + job.Namespace = "" if tc.stillActive { - sj.Status.Active = []api.ObjectReference{{}} + sj.Status.Active = []api.ObjectReference{{UID: job.UID}} + js = append(js, *job) } } else { sj.ObjectMeta.CreationTimestamp = unversioned.Time{Time: justBeforeTheHour()} @@ -209,11 +224,12 @@ func TestSyncOne_RunOrNot(t *testing.T) { } } - jc := &fakeJobControl{} + jc := &fakeJobControl{Job: job} sjc := &fakeSJControl{} + pc := &fakePodControl{} recorder := record.NewFakeRecorder(10) - SyncOne(sj, []batch.Job{}, tc.now, jc, sjc, recorder) + SyncOne(sj, js, tc.now, jc, sjc, pc, recorder) expectedCreates := 0 if tc.expectCreate { expectedCreates = 1 @@ -348,10 +364,11 @@ func TestSyncOne_Status(t *testing.T) { jc := &fakeJobControl{} sjc := &fakeSJControl{} + pc := &fakePodControl{} recorder := record.NewFakeRecorder(10) // Run the code - SyncOne(sj, jobs, tc.now, jc, sjc, recorder) + SyncOne(sj, jobs, tc.now, jc, sjc, pc, recorder) // Status update happens once when ranging through job list, and another one if create jobs. expectUpdates := 1 diff --git a/pkg/controller/scheduledjob/injection.go b/pkg/controller/scheduledjob/injection.go index 9181dd001c9..11f9e5da075 100644 --- a/pkg/controller/scheduledjob/injection.go +++ b/pkg/controller/scheduledjob/injection.go @@ -19,6 +19,7 @@ package scheduledjob import ( "sync" + "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/apis/batch" "k8s.io/kubernetes/pkg/client/record" client "k8s.io/kubernetes/pkg/client/unversioned" @@ -60,8 +61,12 @@ func (c *fakeSJControl) UpdateStatus(sj *batch.ScheduledJob) error { // jobControlInterface is an interface that knows how to add or delete jobs // created as an interface to allow testing. type jobControlInterface interface { + // GetJob retrieves a job + GetJob(namespace, name string) (*batch.Job, error) // CreateJob creates new jobs according to the spec CreateJob(namespace string, job *batch.Job) (*batch.Job, error) + // UpdateJob updates a job + UpdateJob(namespace string, job *batch.Job) (*batch.Job, error) // DeleteJob deletes the job identified by name. // TODO: delete by UID? DeleteJob(namespace string, name string) error @@ -91,6 +96,14 @@ func copyAnnotations(template *batch.JobTemplateSpec) labels.Set { return a } +func (r realJobControl) GetJob(namespace, name string) (*batch.Job, error) { + return r.KubeClient.Batch().Jobs(namespace).Get(name) +} + +func (r realJobControl) UpdateJob(namespace string, job *batch.Job) (*batch.Job, error) { + return r.KubeClient.Batch().Jobs(namespace).Update(job) +} + func (r realJobControl) CreateJob(namespace string, job *batch.Job) (*batch.Job, error) { return r.KubeClient.Batch().Jobs(namespace).Create(job) } @@ -101,6 +114,7 @@ func (r realJobControl) DeleteJob(namespace string, name string) error { type fakeJobControl struct { sync.Mutex + Job *batch.Job Jobs []batch.Job DeleteJobName []string Err error @@ -119,6 +133,24 @@ func (f *fakeJobControl) CreateJob(namespace string, job *batch.Job) (*batch.Job return job, nil } +func (f *fakeJobControl) GetJob(namespace, name string) (*batch.Job, error) { + f.Lock() + defer f.Unlock() + if f.Err != nil { + return nil, f.Err + } + return f.Job, nil +} + +func (f *fakeJobControl) UpdateJob(namespace string, job *batch.Job) (*batch.Job, error) { + f.Lock() + defer f.Unlock() + if f.Err != nil { + return nil, f.Err + } + return job, nil +} + func (f *fakeJobControl) DeleteJob(namespace string, name string) error { f.Lock() defer f.Unlock() @@ -136,3 +168,56 @@ func (f *fakeJobControl) Clear() { f.Jobs = []batch.Job{} f.Err = nil } + +// ------------------------------------------------------------------ // + +// podControlInterface is an interface that knows how to list or delete pods +// created as an interface to allow testing. +type podControlInterface interface { + // ListPods list pods + ListPods(namespace string, opts api.ListOptions) (*api.PodList, error) + // DeleteJob deletes the pod identified by name. + // TODO: delete by UID? + DeletePod(namespace string, name string) error +} + +// realPodControl is the default implementation of podControlInterface. +type realPodControl struct { + KubeClient *client.Client + Recorder record.EventRecorder +} + +var _ podControlInterface = &realPodControl{} + +func (r realPodControl) ListPods(namespace string, opts api.ListOptions) (*api.PodList, error) { + return r.KubeClient.Pods(namespace).List(opts) +} + +func (r realPodControl) DeletePod(namespace string, name string) error { + return r.KubeClient.Pods(namespace).Delete(name, nil) +} + +type fakePodControl struct { + sync.Mutex + Pods []api.Pod + DeletePodName []string + Err error +} + +var _ podControlInterface = &fakePodControl{} + +func (f *fakePodControl) ListPods(namespace string, opts api.ListOptions) (*api.PodList, error) { + f.Lock() + defer f.Unlock() + return &api.PodList{Items: f.Pods}, nil +} + +func (f *fakePodControl) DeletePod(namespace string, name string) error { + f.Lock() + defer f.Unlock() + if f.Err != nil { + return f.Err + } + f.DeletePodName = append(f.DeletePodName, name) + return nil +}