mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-27 13:37:30 +00:00
Merge pull request #30451 from soltysh/isssue30442
Automatic merge from submit-queue Remove pods along with jobs when Replace ConcurrentPolicy is set Fixes #30442 This builds on #30327 and needs a bit more love in tests. @janetkuo @erictune fyi
This commit is contained in:
commit
85c91eb332
@ -35,12 +35,14 @@ import (
|
|||||||
"github.com/golang/glog"
|
"github.com/golang/glog"
|
||||||
|
|
||||||
"k8s.io/kubernetes/pkg/api"
|
"k8s.io/kubernetes/pkg/api"
|
||||||
|
"k8s.io/kubernetes/pkg/api/errors"
|
||||||
"k8s.io/kubernetes/pkg/api/unversioned"
|
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||||
"k8s.io/kubernetes/pkg/apis/batch"
|
"k8s.io/kubernetes/pkg/apis/batch"
|
||||||
"k8s.io/kubernetes/pkg/client/record"
|
"k8s.io/kubernetes/pkg/client/record"
|
||||||
client "k8s.io/kubernetes/pkg/client/unversioned"
|
client "k8s.io/kubernetes/pkg/client/unversioned"
|
||||||
"k8s.io/kubernetes/pkg/controller/job"
|
"k8s.io/kubernetes/pkg/controller/job"
|
||||||
"k8s.io/kubernetes/pkg/runtime"
|
"k8s.io/kubernetes/pkg/runtime"
|
||||||
|
utilerrors "k8s.io/kubernetes/pkg/util/errors"
|
||||||
"k8s.io/kubernetes/pkg/util/metrics"
|
"k8s.io/kubernetes/pkg/util/metrics"
|
||||||
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
|
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
|
||||||
"k8s.io/kubernetes/pkg/util/wait"
|
"k8s.io/kubernetes/pkg/util/wait"
|
||||||
@ -52,6 +54,7 @@ type ScheduledJobController struct {
|
|||||||
kubeClient *client.Client
|
kubeClient *client.Client
|
||||||
jobControl jobControlInterface
|
jobControl jobControlInterface
|
||||||
sjControl sjControlInterface
|
sjControl sjControlInterface
|
||||||
|
podControl podControlInterface
|
||||||
recorder record.EventRecorder
|
recorder record.EventRecorder
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -69,6 +72,7 @@ func NewScheduledJobController(kubeClient *client.Client) *ScheduledJobControlle
|
|||||||
kubeClient: kubeClient,
|
kubeClient: kubeClient,
|
||||||
jobControl: realJobControl{KubeClient: kubeClient},
|
jobControl: realJobControl{KubeClient: kubeClient},
|
||||||
sjControl: &realSJControl{KubeClient: kubeClient},
|
sjControl: &realSJControl{KubeClient: kubeClient},
|
||||||
|
podControl: &realPodControl{KubeClient: kubeClient},
|
||||||
recorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "scheduled-job-controller"}),
|
recorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "scheduled-job-controller"}),
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -112,7 +116,7 @@ func (jm *ScheduledJobController) SyncAll() {
|
|||||||
glog.Infof("Found %d groups", len(jobsBySj))
|
glog.Infof("Found %d groups", len(jobsBySj))
|
||||||
|
|
||||||
for _, sj := range sjs {
|
for _, sj := range sjs {
|
||||||
SyncOne(sj, jobsBySj[sj.UID], time.Now(), jm.jobControl, jm.sjControl, jm.recorder)
|
SyncOne(sj, jobsBySj[sj.UID], time.Now(), jm.jobControl, jm.sjControl, jm.podControl, jm.recorder)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -120,7 +124,7 @@ func (jm *ScheduledJobController) SyncAll() {
|
|||||||
// All known jobs created by "sj" should be included in "js".
|
// All known jobs created by "sj" should be included in "js".
|
||||||
// The current time is passed in to facilitate testing.
|
// The current time is passed in to facilitate testing.
|
||||||
// It has no receiver, to facilitate testing.
|
// It has no receiver, to facilitate testing.
|
||||||
func SyncOne(sj batch.ScheduledJob, js []batch.Job, now time.Time, jc jobControlInterface, sjc sjControlInterface, recorder record.EventRecorder) {
|
func SyncOne(sj batch.ScheduledJob, js []batch.Job, now time.Time, jc jobControlInterface, sjc sjControlInterface, pc podControlInterface, recorder record.EventRecorder) {
|
||||||
nameForLog := fmt.Sprintf("%s/%s", sj.Namespace, sj.Name)
|
nameForLog := fmt.Sprintf("%s/%s", sj.Namespace, sj.Name)
|
||||||
|
|
||||||
for _, j := range js {
|
for _, j := range js {
|
||||||
@ -199,8 +203,46 @@ func SyncOne(sj batch.ScheduledJob, js []batch.Job, now time.Time, jc jobControl
|
|||||||
}
|
}
|
||||||
if sj.Spec.ConcurrencyPolicy == batch.ReplaceConcurrent {
|
if sj.Spec.ConcurrencyPolicy == batch.ReplaceConcurrent {
|
||||||
for _, j := range sj.Status.Active {
|
for _, j := range sj.Status.Active {
|
||||||
|
// TODO: this should be replaced with server side job deletion
|
||||||
|
// currently this mimics JobReaper from pkg/kubectl/stop.go
|
||||||
glog.V(4).Infof("Deleting job %s of %s s that was still running at next scheduled start time", j.Name, nameForLog)
|
glog.V(4).Infof("Deleting job %s of %s s that was still running at next scheduled start time", j.Name, nameForLog)
|
||||||
if err := jc.DeleteJob(j.Namespace, j.Name); err != nil {
|
job, err := jc.GetJob(j.Namespace, j.Name)
|
||||||
|
if err != nil {
|
||||||
|
recorder.Eventf(&sj, api.EventTypeWarning, "FailedGet", "Get job: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// scale job down to 0
|
||||||
|
if *job.Spec.Parallelism != 0 {
|
||||||
|
zero := int32(0)
|
||||||
|
job.Spec.Parallelism = &zero
|
||||||
|
job, err = jc.UpdateJob(job.Namespace, job)
|
||||||
|
if err != nil {
|
||||||
|
recorder.Eventf(&sj, api.EventTypeWarning, "FailedUpdate", "Update job: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// remove all pods...
|
||||||
|
selector, _ := unversioned.LabelSelectorAsSelector(job.Spec.Selector)
|
||||||
|
options := api.ListOptions{LabelSelector: selector}
|
||||||
|
podList, err := pc.ListPods(job.Namespace, options)
|
||||||
|
if err != nil {
|
||||||
|
recorder.Eventf(&sj, api.EventTypeWarning, "FailedList", "List job-pods: %v", err)
|
||||||
|
}
|
||||||
|
errList := []error{}
|
||||||
|
for _, pod := range podList.Items {
|
||||||
|
if err := pc.DeletePod(pod.Namespace, pod.Name); err != nil {
|
||||||
|
// ignores the error when the pod isn't found
|
||||||
|
if !errors.IsNotFound(err) {
|
||||||
|
errList = append(errList, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if len(errList) != 0 {
|
||||||
|
recorder.Eventf(&sj, api.EventTypeWarning, "FailedDelete", "Deleted job-pods: %v", utilerrors.NewAggregate(errList))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// ... and the job itself
|
||||||
|
if err := jc.DeleteJob(job.Namespace, job.Name); err != nil {
|
||||||
recorder.Eventf(&sj, api.EventTypeWarning, "FailedDelete", "Deleted job: %v", err)
|
recorder.Eventf(&sj, api.EventTypeWarning, "FailedDelete", "Deleted job: %v", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -97,7 +97,10 @@ func scheduledJob() batch.ScheduledJob {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func jobSpec() batch.JobSpec {
|
func jobSpec() batch.JobSpec {
|
||||||
|
one := int32(1)
|
||||||
return batch.JobSpec{
|
return batch.JobSpec{
|
||||||
|
Parallelism: &one,
|
||||||
|
Completions: &one,
|
||||||
Template: api.PodTemplateSpec{
|
Template: api.PodTemplateSpec{
|
||||||
ObjectMeta: api.ObjectMeta{
|
ObjectMeta: api.ObjectMeta{
|
||||||
Labels: map[string]string{
|
Labels: map[string]string{
|
||||||
@ -196,11 +199,23 @@ func TestSyncOne_RunOrNot(t *testing.T) {
|
|||||||
sj.Spec.StartingDeadlineSeconds = &tc.deadline
|
sj.Spec.StartingDeadlineSeconds = &tc.deadline
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
job *batch.Job
|
||||||
|
err error
|
||||||
|
)
|
||||||
|
js := []batch.Job{}
|
||||||
if tc.ranPreviously {
|
if tc.ranPreviously {
|
||||||
sj.ObjectMeta.CreationTimestamp = unversioned.Time{Time: justBeforeThePriorHour()}
|
sj.ObjectMeta.CreationTimestamp = unversioned.Time{Time: justBeforeThePriorHour()}
|
||||||
sj.Status.LastScheduleTime = &unversioned.Time{Time: justAfterThePriorHour()}
|
sj.Status.LastScheduleTime = &unversioned.Time{Time: justAfterThePriorHour()}
|
||||||
|
job, err = getJobFromTemplate(&sj, sj.Status.LastScheduleTime.Time)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Unexpected error creating a job from template: %v", err)
|
||||||
|
}
|
||||||
|
job.UID = "1234"
|
||||||
|
job.Namespace = ""
|
||||||
if tc.stillActive {
|
if tc.stillActive {
|
||||||
sj.Status.Active = []api.ObjectReference{{}}
|
sj.Status.Active = []api.ObjectReference{{UID: job.UID}}
|
||||||
|
js = append(js, *job)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
sj.ObjectMeta.CreationTimestamp = unversioned.Time{Time: justBeforeTheHour()}
|
sj.ObjectMeta.CreationTimestamp = unversioned.Time{Time: justBeforeTheHour()}
|
||||||
@ -209,11 +224,12 @@ func TestSyncOne_RunOrNot(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
jc := &fakeJobControl{}
|
jc := &fakeJobControl{Job: job}
|
||||||
sjc := &fakeSJControl{}
|
sjc := &fakeSJControl{}
|
||||||
|
pc := &fakePodControl{}
|
||||||
recorder := record.NewFakeRecorder(10)
|
recorder := record.NewFakeRecorder(10)
|
||||||
|
|
||||||
SyncOne(sj, []batch.Job{}, tc.now, jc, sjc, recorder)
|
SyncOne(sj, js, tc.now, jc, sjc, pc, recorder)
|
||||||
expectedCreates := 0
|
expectedCreates := 0
|
||||||
if tc.expectCreate {
|
if tc.expectCreate {
|
||||||
expectedCreates = 1
|
expectedCreates = 1
|
||||||
@ -348,10 +364,11 @@ func TestSyncOne_Status(t *testing.T) {
|
|||||||
|
|
||||||
jc := &fakeJobControl{}
|
jc := &fakeJobControl{}
|
||||||
sjc := &fakeSJControl{}
|
sjc := &fakeSJControl{}
|
||||||
|
pc := &fakePodControl{}
|
||||||
recorder := record.NewFakeRecorder(10)
|
recorder := record.NewFakeRecorder(10)
|
||||||
|
|
||||||
// Run the code
|
// Run the code
|
||||||
SyncOne(sj, jobs, tc.now, jc, sjc, recorder)
|
SyncOne(sj, jobs, tc.now, jc, sjc, pc, recorder)
|
||||||
|
|
||||||
// Status update happens once when ranging through job list, and another one if create jobs.
|
// Status update happens once when ranging through job list, and another one if create jobs.
|
||||||
expectUpdates := 1
|
expectUpdates := 1
|
||||||
|
@ -19,6 +19,7 @@ package scheduledjob
|
|||||||
import (
|
import (
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
|
"k8s.io/kubernetes/pkg/api"
|
||||||
"k8s.io/kubernetes/pkg/apis/batch"
|
"k8s.io/kubernetes/pkg/apis/batch"
|
||||||
"k8s.io/kubernetes/pkg/client/record"
|
"k8s.io/kubernetes/pkg/client/record"
|
||||||
client "k8s.io/kubernetes/pkg/client/unversioned"
|
client "k8s.io/kubernetes/pkg/client/unversioned"
|
||||||
@ -60,8 +61,12 @@ func (c *fakeSJControl) UpdateStatus(sj *batch.ScheduledJob) error {
|
|||||||
// jobControlInterface is an interface that knows how to add or delete jobs
|
// jobControlInterface is an interface that knows how to add or delete jobs
|
||||||
// created as an interface to allow testing.
|
// created as an interface to allow testing.
|
||||||
type jobControlInterface interface {
|
type jobControlInterface interface {
|
||||||
|
// GetJob retrieves a job
|
||||||
|
GetJob(namespace, name string) (*batch.Job, error)
|
||||||
// CreateJob creates new jobs according to the spec
|
// CreateJob creates new jobs according to the spec
|
||||||
CreateJob(namespace string, job *batch.Job) (*batch.Job, error)
|
CreateJob(namespace string, job *batch.Job) (*batch.Job, error)
|
||||||
|
// UpdateJob updates a job
|
||||||
|
UpdateJob(namespace string, job *batch.Job) (*batch.Job, error)
|
||||||
// DeleteJob deletes the job identified by name.
|
// DeleteJob deletes the job identified by name.
|
||||||
// TODO: delete by UID?
|
// TODO: delete by UID?
|
||||||
DeleteJob(namespace string, name string) error
|
DeleteJob(namespace string, name string) error
|
||||||
@ -91,6 +96,14 @@ func copyAnnotations(template *batch.JobTemplateSpec) labels.Set {
|
|||||||
return a
|
return a
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r realJobControl) GetJob(namespace, name string) (*batch.Job, error) {
|
||||||
|
return r.KubeClient.Batch().Jobs(namespace).Get(name)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r realJobControl) UpdateJob(namespace string, job *batch.Job) (*batch.Job, error) {
|
||||||
|
return r.KubeClient.Batch().Jobs(namespace).Update(job)
|
||||||
|
}
|
||||||
|
|
||||||
func (r realJobControl) CreateJob(namespace string, job *batch.Job) (*batch.Job, error) {
|
func (r realJobControl) CreateJob(namespace string, job *batch.Job) (*batch.Job, error) {
|
||||||
return r.KubeClient.Batch().Jobs(namespace).Create(job)
|
return r.KubeClient.Batch().Jobs(namespace).Create(job)
|
||||||
}
|
}
|
||||||
@ -101,6 +114,7 @@ func (r realJobControl) DeleteJob(namespace string, name string) error {
|
|||||||
|
|
||||||
type fakeJobControl struct {
|
type fakeJobControl struct {
|
||||||
sync.Mutex
|
sync.Mutex
|
||||||
|
Job *batch.Job
|
||||||
Jobs []batch.Job
|
Jobs []batch.Job
|
||||||
DeleteJobName []string
|
DeleteJobName []string
|
||||||
Err error
|
Err error
|
||||||
@ -119,6 +133,24 @@ func (f *fakeJobControl) CreateJob(namespace string, job *batch.Job) (*batch.Job
|
|||||||
return job, nil
|
return job, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (f *fakeJobControl) GetJob(namespace, name string) (*batch.Job, error) {
|
||||||
|
f.Lock()
|
||||||
|
defer f.Unlock()
|
||||||
|
if f.Err != nil {
|
||||||
|
return nil, f.Err
|
||||||
|
}
|
||||||
|
return f.Job, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *fakeJobControl) UpdateJob(namespace string, job *batch.Job) (*batch.Job, error) {
|
||||||
|
f.Lock()
|
||||||
|
defer f.Unlock()
|
||||||
|
if f.Err != nil {
|
||||||
|
return nil, f.Err
|
||||||
|
}
|
||||||
|
return job, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (f *fakeJobControl) DeleteJob(namespace string, name string) error {
|
func (f *fakeJobControl) DeleteJob(namespace string, name string) error {
|
||||||
f.Lock()
|
f.Lock()
|
||||||
defer f.Unlock()
|
defer f.Unlock()
|
||||||
@ -136,3 +168,56 @@ func (f *fakeJobControl) Clear() {
|
|||||||
f.Jobs = []batch.Job{}
|
f.Jobs = []batch.Job{}
|
||||||
f.Err = nil
|
f.Err = nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ------------------------------------------------------------------ //
|
||||||
|
|
||||||
|
// podControlInterface is an interface that knows how to list or delete pods
|
||||||
|
// created as an interface to allow testing.
|
||||||
|
type podControlInterface interface {
|
||||||
|
// ListPods list pods
|
||||||
|
ListPods(namespace string, opts api.ListOptions) (*api.PodList, error)
|
||||||
|
// DeleteJob deletes the pod identified by name.
|
||||||
|
// TODO: delete by UID?
|
||||||
|
DeletePod(namespace string, name string) error
|
||||||
|
}
|
||||||
|
|
||||||
|
// realPodControl is the default implementation of podControlInterface.
|
||||||
|
type realPodControl struct {
|
||||||
|
KubeClient *client.Client
|
||||||
|
Recorder record.EventRecorder
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ podControlInterface = &realPodControl{}
|
||||||
|
|
||||||
|
func (r realPodControl) ListPods(namespace string, opts api.ListOptions) (*api.PodList, error) {
|
||||||
|
return r.KubeClient.Pods(namespace).List(opts)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r realPodControl) DeletePod(namespace string, name string) error {
|
||||||
|
return r.KubeClient.Pods(namespace).Delete(name, nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
type fakePodControl struct {
|
||||||
|
sync.Mutex
|
||||||
|
Pods []api.Pod
|
||||||
|
DeletePodName []string
|
||||||
|
Err error
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ podControlInterface = &fakePodControl{}
|
||||||
|
|
||||||
|
func (f *fakePodControl) ListPods(namespace string, opts api.ListOptions) (*api.PodList, error) {
|
||||||
|
f.Lock()
|
||||||
|
defer f.Unlock()
|
||||||
|
return &api.PodList{Items: f.Pods}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *fakePodControl) DeletePod(namespace string, name string) error {
|
||||||
|
f.Lock()
|
||||||
|
defer f.Unlock()
|
||||||
|
if f.Err != nil {
|
||||||
|
return f.Err
|
||||||
|
}
|
||||||
|
f.DeletePodName = append(f.DeletePodName, name)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user