make JobController RestartPolicy independent

This commit is contained in:
Mike Danese 2015-09-17 20:13:00 -07:00
parent 22072af90d
commit e29e606792
2 changed files with 23 additions and 44 deletions

View File

@ -322,14 +322,11 @@ func (jm *JobController) syncJob(key string) error {
activePods := controller.FilterActivePods(podList.Items) activePods := controller.FilterActivePods(podList.Items)
active := len(activePods) active := len(activePods)
successful, unsuccessful := getStatus(jobKey, job.Spec.Template.Spec.RestartPolicy, podList.Items) successful, unsuccessful := getStatus(podList.Items)
if jobNeedsSync { if jobNeedsSync {
active = jm.manageJob(activePods, successful, unsuccessful, &job) active = jm.manageJob(activePods, successful, unsuccessful, &job)
} }
completions := successful completions := successful
if job.Spec.Template.Spec.RestartPolicy == api.RestartPolicyNever {
completions += unsuccessful
}
if completions == *job.Spec.Completions { if completions == *job.Spec.Completions {
job.Status.Conditions = append(job.Status.Conditions, newCondition()) job.Status.Conditions = append(job.Status.Conditions, newCondition())
} }
@ -357,11 +354,9 @@ func newCondition() experimental.JobCondition {
} }
} }
func getStatus(jobKey string, restartPolicy api.RestartPolicy, pods []api.Pod) (successful, unsuccessful int) { func getStatus(pods []api.Pod) (successful, unsuccessful int) {
successful = filterPods(pods, api.PodSucceeded) successful = filterPods(pods, api.PodSucceeded)
if restartPolicy == api.RestartPolicyNever { unsuccessful = filterPods(pods, api.PodFailed)
unsuccessful = filterPods(pods, api.PodFailed)
}
return return
} }
@ -403,10 +398,6 @@ func (jm *JobController) manageJob(activePods []*api.Pod, successful, unsuccessf
} else if active < parallelism { } else if active < parallelism {
// how many executions are left to run // how many executions are left to run
diff := *job.Spec.Completions - successful diff := *job.Spec.Completions - successful
// for RestartPolicyNever we need to count unsuccessful pods as well
if job.Spec.Template.Spec.RestartPolicy == api.RestartPolicyNever {
diff -= unsuccessful
}
// limit to parallelism and count active pods as well // limit to parallelism and count active pods as well
if diff > parallelism { if diff > parallelism {
diff = parallelism diff = parallelism

View File

@ -79,7 +79,7 @@ func (f *FakePodControl) clear() {
f.podSpec = []api.PodTemplateSpec{} f.podSpec = []api.PodTemplateSpec{}
} }
func newJob(parallelism, completions int, restartPolicy api.RestartPolicy) *experimental.Job { func newJob(parallelism, completions int) *experimental.Job {
return &experimental.Job{ return &experimental.Job{
ObjectMeta: api.ObjectMeta{ ObjectMeta: api.ObjectMeta{
Name: "foobar", Name: "foobar",
@ -96,7 +96,6 @@ func newJob(parallelism, completions int, restartPolicy api.RestartPolicy) *expe
}, },
}, },
Spec: api.PodSpec{ Spec: api.PodSpec{
RestartPolicy: restartPolicy,
Containers: []api.Container{ Containers: []api.Container{
{Image: "foo/bar"}, {Image: "foo/bar"},
}, },
@ -135,9 +134,8 @@ func newPodList(count int, status api.PodPhase, job *experimental.Job) []api.Pod
func TestControllerSyncJob(t *testing.T) { func TestControllerSyncJob(t *testing.T) {
testCases := map[string]struct { testCases := map[string]struct {
// job setup // job setup
parallelism int parallelism int
completions int completions int
restartPolicy api.RestartPolicy
// pod setup // pod setup
podControllerError error podControllerError error
@ -154,62 +152,52 @@ func TestControllerSyncJob(t *testing.T) {
expectedComplete bool expectedComplete bool
}{ }{
"job start": { "job start": {
2, 5, api.RestartPolicyOnFailure, 2, 5,
nil, 0, 0, 0, nil, 0, 0, 0,
2, 0, 2, 0, 0, false, 2, 0, 2, 0, 0, false,
}, },
"correct # of pods": { "correct # of pods": {
2, 5, api.RestartPolicyOnFailure, 2, 5,
nil, 2, 0, 0, nil, 2, 0, 0,
0, 0, 2, 0, 0, false, 0, 0, 2, 0, 0, false,
}, },
"too few active pods": { "too few active pods": {
2, 5, api.RestartPolicyOnFailure, 2, 5,
nil, 1, 1, 0, nil, 1, 1, 0,
1, 0, 2, 1, 0, false, 1, 0, 2, 1, 0, false,
}, },
"too few active pods, with controller error": { "too few active pods, with controller error": {
2, 5, api.RestartPolicyOnFailure, 2, 5,
fmt.Errorf("Fake error"), 1, 1, 0, fmt.Errorf("Fake error"), 1, 1, 0,
0, 0, 1, 1, 0, false, 0, 0, 1, 1, 0, false,
}, },
"too many active pods": { "too many active pods": {
2, 5, api.RestartPolicyOnFailure, 2, 5,
nil, 3, 0, 0, nil, 3, 0, 0,
0, 1, 2, 0, 0, false, 0, 1, 2, 0, 0, false,
}, },
"too many active pods, with controller error": { "too many active pods, with controller error": {
2, 5, api.RestartPolicyOnFailure, 2, 5,
fmt.Errorf("Fake error"), 3, 0, 0, fmt.Errorf("Fake error"), 3, 0, 0,
0, 0, 3, 0, 0, false, 0, 0, 3, 0, 0, false,
}, },
"failed pod and OnFailure restart policy": { "failed pod": {
2, 5, api.RestartPolicyOnFailure, 2, 5,
nil, 1, 1, 1,
1, 0, 2, 1, 0, false,
},
"failed pod and Never restart policy": {
2, 5, api.RestartPolicyNever,
nil, 1, 1, 1, nil, 1, 1, 1,
1, 0, 2, 1, 1, false, 1, 0, 2, 1, 1, false,
}, },
"job finish and OnFailure restart policy": { "job finish": {
2, 5, api.RestartPolicyOnFailure, 2, 5,
nil, 0, 5, 0, nil, 0, 5, 0,
0, 0, 0, 5, 0, true, 0, 0, 0, 5, 0, true,
}, },
"job finish and Never restart policy": {
2, 5, api.RestartPolicyNever,
nil, 0, 2, 3,
0, 0, 0, 2, 3, true,
},
"more active pods than completions": { "more active pods than completions": {
2, 5, api.RestartPolicyOnFailure, 2, 5,
nil, 10, 0, 0, nil, 10, 0, 0,
0, 8, 2, 0, 0, false, 0, 8, 2, 0, 0, false,
}, },
"status change": { "status change": {
2, 5, api.RestartPolicyOnFailure, 2, 5,
nil, 2, 2, 0, nil, 2, 2, 0,
0, 0, 2, 2, 0, false, 0, 0, 2, 2, 0, false,
}, },
@ -229,7 +217,7 @@ func TestControllerSyncJob(t *testing.T) {
} }
// job & pods setup // job & pods setup
job := newJob(tc.parallelism, tc.completions, tc.restartPolicy) job := newJob(tc.parallelism, tc.completions)
manager.jobStore.Store.Add(job) manager.jobStore.Store.Add(job)
for _, pod := range newPodList(tc.activePods, api.PodRunning, job) { for _, pod := range newPodList(tc.activePods, api.PodRunning, job) {
manager.podStore.Store.Add(&pod) manager.podStore.Store.Add(&pod)
@ -287,7 +275,7 @@ func TestSyncJobDeleted(t *testing.T) {
manager.podControl = &fakePodControl manager.podControl = &fakePodControl
manager.podStoreSynced = alwaysReady manager.podStoreSynced = alwaysReady
manager.updateHandler = func(job *experimental.Job) error { return nil } manager.updateHandler = func(job *experimental.Job) error { return nil }
job := newJob(2, 2, api.RestartPolicyOnFailure) job := newJob(2, 2)
err := manager.syncJob(getKey(job, t)) err := manager.syncJob(getKey(job, t))
if err != nil { if err != nil {
t.Errorf("Unexpected error when syncing jobs %v", err) t.Errorf("Unexpected error when syncing jobs %v", err)
@ -307,7 +295,7 @@ func TestSyncJobUpdateRequeue(t *testing.T) {
manager.podControl = &fakePodControl manager.podControl = &fakePodControl
manager.podStoreSynced = alwaysReady manager.podStoreSynced = alwaysReady
manager.updateHandler = func(job *experimental.Job) error { return fmt.Errorf("Fake error") } manager.updateHandler = func(job *experimental.Job) error { return fmt.Errorf("Fake error") }
job := newJob(2, 2, api.RestartPolicyOnFailure) job := newJob(2, 2)
manager.jobStore.Store.Add(job) manager.jobStore.Store.Add(job)
err := manager.syncJob(getKey(job, t)) err := manager.syncJob(getKey(job, t))
if err != nil { if err != nil {
@ -418,7 +406,7 @@ func TestSyncJobExpectations(t *testing.T) {
manager.podStoreSynced = alwaysReady manager.podStoreSynced = alwaysReady
manager.updateHandler = func(job *experimental.Job) error { return nil } manager.updateHandler = func(job *experimental.Job) error { return nil }
job := newJob(2, 2, api.RestartPolicyOnFailure) job := newJob(2, 2)
manager.jobStore.Store.Add(job) manager.jobStore.Store.Add(job)
pods := newPodList(2, api.PodPending, job) pods := newPodList(2, api.PodPending, job)
manager.podStore.Store.Add(&pods[0]) manager.podStore.Store.Add(&pods[0])
@ -516,7 +504,7 @@ func TestWatchPods(t *testing.T) {
manager.podStoreSynced = alwaysReady manager.podStoreSynced = alwaysReady
// Put one job and one pod into the store // Put one job and one pod into the store
testJob := newJob(2, 2, api.RestartPolicyOnFailure) testJob := newJob(2, 2)
manager.jobStore.Store.Add(testJob) manager.jobStore.Store.Add(testJob)
received := make(chan string) received := make(chan string)
// The pod update sent through the fakeWatcher should figure out the managing job and // The pod update sent through the fakeWatcher should figure out the managing job and