From e29e606792c69d6aa03f31aee7b5669799dead19 Mon Sep 17 00:00:00 2001 From: Mike Danese Date: Thu, 17 Sep 2015 20:13:00 -0700 Subject: [PATCH] make JobController RestartPolicy independent --- pkg/controller/job/controller.go | 15 ++------ pkg/controller/job/controller_test.go | 52 +++++++++++---------------- 2 files changed, 23 insertions(+), 44 deletions(-) diff --git a/pkg/controller/job/controller.go b/pkg/controller/job/controller.go index 0ec8023f9e6..17e65367438 100644 --- a/pkg/controller/job/controller.go +++ b/pkg/controller/job/controller.go @@ -322,14 +322,11 @@ func (jm *JobController) syncJob(key string) error { activePods := controller.FilterActivePods(podList.Items) active := len(activePods) - successful, unsuccessful := getStatus(jobKey, job.Spec.Template.Spec.RestartPolicy, podList.Items) + successful, unsuccessful := getStatus(podList.Items) if jobNeedsSync { active = jm.manageJob(activePods, successful, unsuccessful, &job) } completions := successful - if job.Spec.Template.Spec.RestartPolicy == api.RestartPolicyNever { - completions += unsuccessful - } if completions == *job.Spec.Completions { job.Status.Conditions = append(job.Status.Conditions, newCondition()) } @@ -357,11 +354,9 @@ func newCondition() experimental.JobCondition { } } -func getStatus(jobKey string, restartPolicy api.RestartPolicy, pods []api.Pod) (successful, unsuccessful int) { +func getStatus(pods []api.Pod) (successful, unsuccessful int) { successful = filterPods(pods, api.PodSucceeded) - if restartPolicy == api.RestartPolicyNever { - unsuccessful = filterPods(pods, api.PodFailed) - } + unsuccessful = filterPods(pods, api.PodFailed) return } @@ -403,10 +398,6 @@ func (jm *JobController) manageJob(activePods []*api.Pod, successful, unsuccessf } else if active < parallelism { // how many executions are left to run diff := *job.Spec.Completions - successful - // for RestartPolicyNever we need to count unsuccessful pods as well - if job.Spec.Template.Spec.RestartPolicy == api.RestartPolicyNever { - diff -= unsuccessful - } // limit to parallelism and count active pods as well if diff > parallelism { diff = parallelism diff --git a/pkg/controller/job/controller_test.go b/pkg/controller/job/controller_test.go index d6bcda20dc3..5a5d36595ab 100644 --- a/pkg/controller/job/controller_test.go +++ b/pkg/controller/job/controller_test.go @@ -79,7 +79,7 @@ func (f *FakePodControl) clear() { f.podSpec = []api.PodTemplateSpec{} } -func newJob(parallelism, completions int, restartPolicy api.RestartPolicy) *experimental.Job { +func newJob(parallelism, completions int) *experimental.Job { return &experimental.Job{ ObjectMeta: api.ObjectMeta{ Name: "foobar", @@ -96,7 +96,6 @@ func newJob(parallelism, completions int, restartPolicy api.RestartPolicy) *expe }, }, Spec: api.PodSpec{ - RestartPolicy: restartPolicy, Containers: []api.Container{ {Image: "foo/bar"}, }, @@ -135,9 +134,8 @@ func newPodList(count int, status api.PodPhase, job *experimental.Job) []api.Pod func TestControllerSyncJob(t *testing.T) { testCases := map[string]struct { // job setup - parallelism int - completions int - restartPolicy api.RestartPolicy + parallelism int + completions int // pod setup podControllerError error @@ -154,62 +152,52 @@ func TestControllerSyncJob(t *testing.T) { expectedComplete bool }{ "job start": { - 2, 5, api.RestartPolicyOnFailure, + 2, 5, nil, 0, 0, 0, 2, 0, 2, 0, 0, false, }, "correct # of pods": { - 2, 5, api.RestartPolicyOnFailure, + 2, 5, nil, 2, 0, 0, 0, 0, 2, 0, 0, false, }, "too few active pods": { - 2, 5, api.RestartPolicyOnFailure, + 2, 5, nil, 1, 1, 0, 1, 0, 2, 1, 0, false, }, "too few active pods, with controller error": { - 2, 5, api.RestartPolicyOnFailure, + 2, 5, fmt.Errorf("Fake error"), 1, 1, 0, 0, 0, 1, 1, 0, false, }, "too many active pods": { - 2, 5, api.RestartPolicyOnFailure, + 2, 5, nil, 3, 0, 0, 0, 1, 2, 0, 0, false, }, "too many active pods, with controller error": { - 2, 5, api.RestartPolicyOnFailure, + 2, 5, fmt.Errorf("Fake error"), 3, 0, 0, 0, 0, 3, 0, 0, false, }, - "failed pod and OnFailure restart policy": { - 2, 5, api.RestartPolicyOnFailure, - nil, 1, 1, 1, - 1, 0, 2, 1, 0, false, - }, - "failed pod and Never restart policy": { - 2, 5, api.RestartPolicyNever, + "failed pod": { + 2, 5, nil, 1, 1, 1, 1, 0, 2, 1, 1, false, }, - "job finish and OnFailure restart policy": { - 2, 5, api.RestartPolicyOnFailure, + "job finish": { + 2, 5, nil, 0, 5, 0, 0, 0, 0, 5, 0, true, }, - "job finish and Never restart policy": { - 2, 5, api.RestartPolicyNever, - nil, 0, 2, 3, - 0, 0, 0, 2, 3, true, - }, "more active pods than completions": { - 2, 5, api.RestartPolicyOnFailure, + 2, 5, nil, 10, 0, 0, 0, 8, 2, 0, 0, false, }, "status change": { - 2, 5, api.RestartPolicyOnFailure, + 2, 5, nil, 2, 2, 0, 0, 0, 2, 2, 0, false, }, @@ -229,7 +217,7 @@ func TestControllerSyncJob(t *testing.T) { } // job & pods setup - job := newJob(tc.parallelism, tc.completions, tc.restartPolicy) + job := newJob(tc.parallelism, tc.completions) manager.jobStore.Store.Add(job) for _, pod := range newPodList(tc.activePods, api.PodRunning, job) { manager.podStore.Store.Add(&pod) @@ -287,7 +275,7 @@ func TestSyncJobDeleted(t *testing.T) { manager.podControl = &fakePodControl manager.podStoreSynced = alwaysReady manager.updateHandler = func(job *experimental.Job) error { return nil } - job := newJob(2, 2, api.RestartPolicyOnFailure) + job := newJob(2, 2) err := manager.syncJob(getKey(job, t)) if err != nil { t.Errorf("Unexpected error when syncing jobs %v", err) @@ -307,7 +295,7 @@ func TestSyncJobUpdateRequeue(t *testing.T) { manager.podControl = &fakePodControl manager.podStoreSynced = alwaysReady manager.updateHandler = func(job *experimental.Job) error { return fmt.Errorf("Fake error") } - job := newJob(2, 2, api.RestartPolicyOnFailure) + job := newJob(2, 2) manager.jobStore.Store.Add(job) err := manager.syncJob(getKey(job, t)) if err != nil { @@ -418,7 +406,7 @@ func TestSyncJobExpectations(t *testing.T) { manager.podStoreSynced = alwaysReady manager.updateHandler = func(job *experimental.Job) error { return nil } - job := newJob(2, 2, api.RestartPolicyOnFailure) + job := newJob(2, 2) manager.jobStore.Store.Add(job) pods := newPodList(2, api.PodPending, job) manager.podStore.Store.Add(&pods[0]) @@ -516,7 +504,7 @@ func TestWatchPods(t *testing.T) { manager.podStoreSynced = alwaysReady // Put one job and one pod into the store - testJob := newJob(2, 2, api.RestartPolicyOnFailure) + testJob := newJob(2, 2) manager.jobStore.Store.Add(testJob) received := make(chan string) // The pod update sent through the fakeWatcher should figure out the managing job and