Remove pods along with jobs when Replace ConcurrentPolicy is set

This commit is contained in:
Maciej Szulik 2016-08-11 16:59:40 +02:00
parent e39d7f71e6
commit d446930699
3 changed files with 151 additions and 7 deletions

View File

@ -35,12 +35,14 @@ import (
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/apis/batch"
"k8s.io/kubernetes/pkg/client/record"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/controller/job"
"k8s.io/kubernetes/pkg/runtime"
utilerrors "k8s.io/kubernetes/pkg/util/errors"
"k8s.io/kubernetes/pkg/util/metrics"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/wait"
@ -52,6 +54,7 @@ type ScheduledJobController struct {
kubeClient *client.Client
jobControl jobControlInterface
sjControl sjControlInterface
podControl podControlInterface
recorder record.EventRecorder
}
@ -69,6 +72,7 @@ func NewScheduledJobController(kubeClient *client.Client) *ScheduledJobControlle
kubeClient: kubeClient,
jobControl: realJobControl{KubeClient: kubeClient},
sjControl: &realSJControl{KubeClient: kubeClient},
podControl: &realPodControl{KubeClient: kubeClient},
recorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "scheduled-job-controller"}),
}
@ -112,7 +116,7 @@ func (jm *ScheduledJobController) SyncAll() {
glog.Infof("Found %d groups", len(jobsBySj))
for _, sj := range sjs {
SyncOne(sj, jobsBySj[sj.UID], time.Now(), jm.jobControl, jm.sjControl, jm.recorder)
SyncOne(sj, jobsBySj[sj.UID], time.Now(), jm.jobControl, jm.sjControl, jm.podControl, jm.recorder)
}
}
@ -120,7 +124,7 @@ func (jm *ScheduledJobController) SyncAll() {
// All known jobs created by "sj" should be included in "js".
// The current time is passed in to facilitate testing.
// It has no receiver, to facilitate testing.
func SyncOne(sj batch.ScheduledJob, js []batch.Job, now time.Time, jc jobControlInterface, sjc sjControlInterface, recorder record.EventRecorder) {
func SyncOne(sj batch.ScheduledJob, js []batch.Job, now time.Time, jc jobControlInterface, sjc sjControlInterface, pc podControlInterface, recorder record.EventRecorder) {
nameForLog := fmt.Sprintf("%s/%s", sj.Namespace, sj.Name)
for _, j := range js {
@ -199,8 +203,46 @@ func SyncOne(sj batch.ScheduledJob, js []batch.Job, now time.Time, jc jobControl
}
if sj.Spec.ConcurrencyPolicy == batch.ReplaceConcurrent {
for _, j := range sj.Status.Active {
// TODO: this should be replaced with server side job deletion
// currently this mimics JobReaper from pkg/kubectl/stop.go
glog.V(4).Infof("Deleting job %s of %s s that was still running at next scheduled start time", j.Name, nameForLog)
if err := jc.DeleteJob(j.Namespace, j.Name); err != nil {
job, err := jc.GetJob(j.Namespace, j.Name)
if err != nil {
recorder.Eventf(&sj, api.EventTypeWarning, "FailedGet", "Get job: %v", err)
return
}
// scale job down to 0
if *job.Spec.Parallelism != 0 {
zero := int32(0)
job.Spec.Parallelism = &zero
job, err = jc.UpdateJob(job.Namespace, job)
if err != nil {
recorder.Eventf(&sj, api.EventTypeWarning, "FailedUpdate", "Update job: %v", err)
return
}
}
// remove all pods...
selector, _ := unversioned.LabelSelectorAsSelector(job.Spec.Selector)
options := api.ListOptions{LabelSelector: selector}
podList, err := pc.ListPods(job.Namespace, options)
if err != nil {
recorder.Eventf(&sj, api.EventTypeWarning, "FailedList", "List job-pods: %v", err)
}
errList := []error{}
for _, pod := range podList.Items {
if err := pc.DeletePod(pod.Namespace, pod.Name); err != nil {
// ignores the error when the pod isn't found
if !errors.IsNotFound(err) {
errList = append(errList, err)
}
}
}
if len(errList) != 0 {
recorder.Eventf(&sj, api.EventTypeWarning, "FailedDelete", "Deleted job-pods: %v", utilerrors.NewAggregate(errList))
return
}
// ... and the job itself
if err := jc.DeleteJob(job.Namespace, job.Name); err != nil {
recorder.Eventf(&sj, api.EventTypeWarning, "FailedDelete", "Deleted job: %v", err)
return
}

View File

@ -97,7 +97,10 @@ func scheduledJob() batch.ScheduledJob {
}
func jobSpec() batch.JobSpec {
one := int32(1)
return batch.JobSpec{
Parallelism: &one,
Completions: &one,
Template: api.PodTemplateSpec{
ObjectMeta: api.ObjectMeta{
Labels: map[string]string{
@ -196,11 +199,23 @@ func TestSyncOne_RunOrNot(t *testing.T) {
sj.Spec.StartingDeadlineSeconds = &tc.deadline
}
var (
job *batch.Job
err error
)
js := []batch.Job{}
if tc.ranPreviously {
sj.ObjectMeta.CreationTimestamp = unversioned.Time{Time: justBeforeThePriorHour()}
sj.Status.LastScheduleTime = &unversioned.Time{Time: justAfterThePriorHour()}
job, err = getJobFromTemplate(&sj, sj.Status.LastScheduleTime.Time)
if err != nil {
t.Fatalf("Unexpected error creating a job from template: %v", err)
}
job.UID = "1234"
job.Namespace = ""
if tc.stillActive {
sj.Status.Active = []api.ObjectReference{{}}
sj.Status.Active = []api.ObjectReference{{UID: job.UID}}
js = append(js, *job)
}
} else {
sj.ObjectMeta.CreationTimestamp = unversioned.Time{Time: justBeforeTheHour()}
@ -209,11 +224,12 @@ func TestSyncOne_RunOrNot(t *testing.T) {
}
}
jc := &fakeJobControl{}
jc := &fakeJobControl{Job: job}
sjc := &fakeSJControl{}
pc := &fakePodControl{}
recorder := record.NewFakeRecorder(10)
SyncOne(sj, []batch.Job{}, tc.now, jc, sjc, recorder)
SyncOne(sj, js, tc.now, jc, sjc, pc, recorder)
expectedCreates := 0
if tc.expectCreate {
expectedCreates = 1
@ -348,10 +364,11 @@ func TestSyncOne_Status(t *testing.T) {
jc := &fakeJobControl{}
sjc := &fakeSJControl{}
pc := &fakePodControl{}
recorder := record.NewFakeRecorder(10)
// Run the code
SyncOne(sj, jobs, tc.now, jc, sjc, recorder)
SyncOne(sj, jobs, tc.now, jc, sjc, pc, recorder)
// Status update happens once when ranging through job list, and another one if create jobs.
expectUpdates := 1

View File

@ -19,6 +19,7 @@ package scheduledjob
import (
"sync"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/apis/batch"
"k8s.io/kubernetes/pkg/client/record"
client "k8s.io/kubernetes/pkg/client/unversioned"
@ -60,8 +61,12 @@ func (c *fakeSJControl) UpdateStatus(sj *batch.ScheduledJob) error {
// jobControlInterface is an interface that knows how to add or delete jobs
// created as an interface to allow testing.
type jobControlInterface interface {
// GetJob retrieves a job
GetJob(namespace, name string) (*batch.Job, error)
// CreateJob creates new jobs according to the spec
CreateJob(namespace string, job *batch.Job) (*batch.Job, error)
// UpdateJob updates a job
UpdateJob(namespace string, job *batch.Job) (*batch.Job, error)
// DeleteJob deletes the job identified by name.
// TODO: delete by UID?
DeleteJob(namespace string, name string) error
@ -91,6 +96,14 @@ func copyAnnotations(template *batch.JobTemplateSpec) labels.Set {
return a
}
func (r realJobControl) GetJob(namespace, name string) (*batch.Job, error) {
return r.KubeClient.Batch().Jobs(namespace).Get(name)
}
func (r realJobControl) UpdateJob(namespace string, job *batch.Job) (*batch.Job, error) {
return r.KubeClient.Batch().Jobs(namespace).Update(job)
}
func (r realJobControl) CreateJob(namespace string, job *batch.Job) (*batch.Job, error) {
return r.KubeClient.Batch().Jobs(namespace).Create(job)
}
@ -101,6 +114,7 @@ func (r realJobControl) DeleteJob(namespace string, name string) error {
type fakeJobControl struct {
sync.Mutex
Job *batch.Job
Jobs []batch.Job
DeleteJobName []string
Err error
@ -119,6 +133,24 @@ func (f *fakeJobControl) CreateJob(namespace string, job *batch.Job) (*batch.Job
return job, nil
}
func (f *fakeJobControl) GetJob(namespace, name string) (*batch.Job, error) {
f.Lock()
defer f.Unlock()
if f.Err != nil {
return nil, f.Err
}
return f.Job, nil
}
func (f *fakeJobControl) UpdateJob(namespace string, job *batch.Job) (*batch.Job, error) {
f.Lock()
defer f.Unlock()
if f.Err != nil {
return nil, f.Err
}
return job, nil
}
func (f *fakeJobControl) DeleteJob(namespace string, name string) error {
f.Lock()
defer f.Unlock()
@ -136,3 +168,56 @@ func (f *fakeJobControl) Clear() {
f.Jobs = []batch.Job{}
f.Err = nil
}
// ------------------------------------------------------------------ //
// podControlInterface is an interface that knows how to list or delete pods
// created as an interface to allow testing.
type podControlInterface interface {
// ListPods list pods
ListPods(namespace string, opts api.ListOptions) (*api.PodList, error)
// DeleteJob deletes the pod identified by name.
// TODO: delete by UID?
DeletePod(namespace string, name string) error
}
// realPodControl is the default implementation of podControlInterface.
type realPodControl struct {
KubeClient *client.Client
Recorder record.EventRecorder
}
var _ podControlInterface = &realPodControl{}
func (r realPodControl) ListPods(namespace string, opts api.ListOptions) (*api.PodList, error) {
return r.KubeClient.Pods(namespace).List(opts)
}
func (r realPodControl) DeletePod(namespace string, name string) error {
return r.KubeClient.Pods(namespace).Delete(name, nil)
}
type fakePodControl struct {
sync.Mutex
Pods []api.Pod
DeletePodName []string
Err error
}
var _ podControlInterface = &fakePodControl{}
func (f *fakePodControl) ListPods(namespace string, opts api.ListOptions) (*api.PodList, error) {
f.Lock()
defer f.Unlock()
return &api.PodList{Items: f.Pods}, nil
}
func (f *fakePodControl) DeletePod(namespace string, name string) error {
f.Lock()
defer f.Unlock()
if f.Err != nil {
return f.Err
}
f.DeletePodName = append(f.DeletePodName, name)
return nil
}