From 53ee76fe1a14cc45496c5ca4b1869fdd4bd89251 Mon Sep 17 00:00:00 2001 From: Eric Tune Date: Mon, 14 Dec 2015 15:26:16 -0800 Subject: [PATCH] Support Work Queue jobs with variable parallelism When job.spec.completions is nil, only one task needs to succeed for the job to succeed, and parallelism can be scaled freely during runtime. Added tests. Release Note: This causes two minor changes to the API. First, unset parallelism previously was defaulted to be equal to completions. Now it always defaults to 1 if unset. Second, having parallelism=N and completions unset would previously be defaulted to 1 completion and N parallelism. (this is not something we expect people to do, though) Now, no defaulting occurs in that case, and the job's behavior is different (any completion causes success). --- api/swagger-spec/v1beta1.json | 2 +- .../extensions/v1beta1/definitions.html | 6 +- docs/user-guide/jobs.md | 95 +++++++---- examples/job/work-queue-2/README.md | 20 ++- examples/job/work-queue-2/job.yaml | 1 - pkg/apis/extensions/types.go | 2 +- pkg/apis/extensions/v1beta1/defaults.go | 13 +- pkg/apis/extensions/v1beta1/defaults_test.go | 147 ++++++++++++++---- pkg/apis/extensions/v1beta1/types.go | 6 +- .../v1beta1/types_swagger_doc_generated.go | 2 +- pkg/client/unversioned/conditions.go | 11 +- pkg/controller/job/controller.go | 58 ++++++- pkg/controller/job/controller_test.go | 47 +++++- pkg/kubectl/describe.go | 6 +- 14 files changed, 325 insertions(+), 91 deletions(-) diff --git a/api/swagger-spec/v1beta1.json b/api/swagger-spec/v1beta1.json index c6869483de2..18adf977002 100644 --- a/api/swagger-spec/v1beta1.json +++ b/api/swagger-spec/v1beta1.json @@ -4025,7 +4025,7 @@ "completions": { "type": "integer", "format": "int32", - "description": "Completions specifies the desired number of successfully finished pods the job should be run with. Defaults to 1. More info: http://releases.k8s.io/HEAD/docs/user-guide/jobs.md" + "description": "Completions specifies the desired number of successfully finished pods the job should be run with. Setting to nil means that the success of any pod signals the success of all pods, and allows parallelism to have any positive value. Setting to 1 means that parallelism is limited to 1 and the success of that pod signals the success of the job." }, "activeDeadlineSeconds": { "type": "integer", diff --git a/docs/api-reference/extensions/v1beta1/definitions.html b/docs/api-reference/extensions/v1beta1/definitions.html index bca7e32ad52..f2e589cf5c0 100755 --- a/docs/api-reference/extensions/v1beta1/definitions.html +++ b/docs/api-reference/extensions/v1beta1/definitions.html @@ -3633,7 +3633,7 @@ Populated by the system when a graceful deletion is requested. Read-only. More i

completions

-

Completions specifies the desired number of successfully finished pods the job should be run with. Defaults to 1. More info: http://releases.k8s.io/HEAD/docs/user-guide/jobs.md

+

Completions specifies the desired number of successfully finished pods the job should be run with. Setting to nil means that the success of any pod signals the success of all pods, and allows parallelism to have any positive value. Setting to 1 means that parallelism is limited to 1 and the success of that pod signals the success of the job.

false

integer (int32)

@@ -4573,8 +4573,8 @@ Populated by the system when a graceful deletion is requested. Read-only. More i - \ No newline at end of file + diff --git a/docs/user-guide/jobs.md b/docs/user-guide/jobs.md index b0b58ac0d1f..ea7c2163c9a 100644 --- a/docs/user-guide/jobs.md +++ b/docs/user-guide/jobs.md @@ -43,7 +43,8 @@ Documentation for other releases can be found at - [Writing a Job Spec](#writing-a-job-spec) - [Pod Template](#pod-template) - [Pod Selector](#pod-selector) - - [Parallelism and Completions](#parallelism-and-completions) + - [Parallel Jobs](#parallel-jobs) + - [Controlling Parallelism](#controlling-parallelism) - [Handling Pod and Container Failures](#handling-pod-and-container-failures) - [Job Patterns](#job-patterns) - [Alternatives](#alternatives) @@ -103,7 +104,7 @@ Run the example job by downloading the example file and then running this comman ```console $ kubectl create -f ./job.yaml -jobs/pi +job "pi" created ``` Check on the status of the job using this command: @@ -113,16 +114,17 @@ $ kubectl describe jobs/pi Name: pi Namespace: default Image(s): perl -Selector: app=pi -Parallelism: 2 +Selector: app in (pi) +Parallelism: 1 Completions: 1 -Labels: -Pods Statuses: 1 Running / 0 Succeeded / 0 Failed +Start Time: Mon, 11 Jan 2016 15:35:52 -0800 +Labels: app=pi +Pods Statuses: 0 Running / 1 Succeeded / 0 Failed +No volumes. Events: - FirstSeen LastSeen Count From SubobjectPath Reason Message - ───────── ──────── ───── ──── ───────────── ────── ─────── - 1m 1m 1 {job } SuccessfulCreate Created pod: pi-z548a - + FirstSeen LastSeen Count From SubobjectPath Type Reason Message + --------- -------- ----- ---- ------------- -------- ------ ------- + 1m 1m 1 {job-controller } Normal SuccessfulCreate Created pod: pi-dtn4q ``` To view completed pods of a job, use `kubectl get pods --show-all`. The `--show-all` will show completed pods too. @@ -141,7 +143,7 @@ that just gets the name from each pod in the returned list. View the standard output of one of the pods: ```console -$ kubectl logs pi-aiw0a +$ kubectl logs $pods 3.1415926535897932384626433832795028841971693993751058209749445923078164062862089986280348253421170679821480865132823066470938446095505822317253594081284811174502841027019385211055596446229489549303819644288109756659334461284756482337867831652712019091456485669234603486104543266482133936072602491412737245870066063155881748815209209628292540917153643678925903600113305305488204665213841469519415116094330572703657595919530921861173819326117931051185480744623799627495673518857527248912279381830119491298336733624406566430860213949463952247371907021798609437027705392171762931767523846748184676694051320005681271452635608277857713427577896091736371787214684409012249534301465495853710507922796892589235420199561121290219608640344181598136297747713099605187072113499999983729780499510597317328160963185950244594553469083026425223082533446850352619311881710100031378387528865875332083814206171776691473035982534904287554687311595628638823537875937519577818577805321712268066130019278766111959092164201989380952572010654858632788659361533818279682303019520353018529689957736225994138912497217752834791315155748572424541506959508295331168617278558890750983817546374649393192550604009277016711390098488240128583616035637076601047101819429555961989467678374494482553797747268471040475346462080466842590694912933136770289891521047521620569660240580381501935112533824300355876402474964732639141992726042699227967823547816360093417216412199245863150302861829745557067498385054945885869269956909272107975093029553211653449872027559602364806654991198818347977535663698074265425278625518184175746728909777727938000816470600161452491921732172147723501414419735685481613611573525521334757418494684385233239073941433345477624168625189835694855620992192221842725502542568876717904946016534668049886272327917860857843838279679766814541009538837863609506800642251252051173929848960841284886269456042419652850222106611863067442786220391949450471237137869609563643719172874677646575739624138908658326459958133904780275901 ``` @@ -184,27 +186,66 @@ Also you should not normally create any pods whose labels match this selector, e via another Job, or via another controller such as ReplicationController. Otherwise, the Job will think that those pods were created by it. Kubernetes will not stop you from doing this. -### Parallelism and Completions +### Parallel Jobs -By default, a Job is complete when one Pod runs to successful completion. +There are three main types of jobs: -A single Job object can also be used to control multiple pods running in -parallel. There are several different [patterns for running parallel -jobs](#job-patterns). +1. Non-parallel Jobs + - normally only one pod is started, unless the pod fails. + - job is complete as soon as Pod terminates successfully. +1. Parallel Jobs with a *fixed completion count*: + - specify a non-zero positive value for `.spec.completions` + - the job is complete when there is one successful pod for each value in the range 1 to `.spec.completions`. + - **not implemented yet:** each pod passed a different index in the range 1 to `.spec.completions`. +1. Parallel Jobs with a *work queue*: + - do not specify `.spec.completions` + - the pods must coordinate with themselves or an external service to determine what each should work on + - each pod is independently capable of determining whether or not all its peers are done, thus the entire Job is done. + - when _any_ pod terminates with success, no new pods are created. + - once at least one pod has terminated with success and all pods are terminated, then the job is completed with success. + - once any pod has exited with success, no other pod should still be doing any work or writing any output. They should all be + in the process of exiting. -With some of these patterns, you can suggest how many pods should run -concurrently by setting `.spec.parallelism` to the number of pods you would -like to have running concurrently. This number is a suggestion. The number -running concurrently may be lower or higher for a variety of reasons. For -example, it may be lower if the number of remaining completions is less, or as -the controller is ramping up, or if it is throttling the job due to excessive -failures. It may be higher for example if a pod is gracefully shutdown, and -the replacement starts early. +For a Non-parallel job, you can leave both `.spec.completions` and `.spec.parallelism` unset. When both are +unset, both are defaulted to 1. -If you do not specify `.spec.parallelism`, then it defaults to `.spec.completions`. +For a Fixed Completion Count job, you should set `.spec.completions` to the number of completions needed. +You can set `.spec.parallelism`, or leave it unset and it will default to 1. + +For a Work Queue Job, you must leave `.spec.completions` unset, and set `.spec.parallelism` to +a non-negative integer. + +For more information about how to make use of the different types of job, see the [job patterns](#job-patterns) section. + + +#### Controlling Parallelism + +The requested parallelism (`.spec.parallelism`) can be set to any non-negative value. +If it is unspecified, it defaults to 1. +If it is specified as 0, then the Job is effectively paused until it is increased. + +A job can be scaled up using the `kubectl scale` command. For example, the following +command sets `.spec.parallelism` of a job called `myjob` to 10: + +```console +$ kubectl scale --replicas=$N jobs/myjob +job "myjob" scaled +``` + +You can also use the `scale` subresource of the Job resource. + +Actual parallelism (number of pods running at any instant) may be more or less than requested +parallelism, for a variety or reasons: + +- For Fixed Completion Count jobs, the actual number of pods running in parallel will not exceed the number of + remaining completions. Higher values of `.spec.parallelism` are effectively ignored. +- For work queue jobs, no new pods are started after any pod has succeded -- remaining pods are allowed to complete, however. +- If the controller has not had time to react. +- If the controller failed to create pods for any reason (lack of ResourceQuota, lack of permission, etc), + then there may be fewer pods than requested. +- The controller may throttle new pod creation due to excessive previous pod failures in the same Job. +- When a pod is gracefully shutdown, it make take time to stop. -Depending on the pattern you are using, you will either set `.spec.completions` -to 1 or to the number of units of work (see [Job Patterns] for an explanation). ## Handling Pod and Container Failures diff --git a/examples/job/work-queue-2/README.md b/examples/job/work-queue-2/README.md index 71a149b391b..6645f03f6ce 100644 --- a/examples/job/work-queue-2/README.md +++ b/examples/job/work-queue-2/README.md @@ -185,6 +185,8 @@ using above links. Then build the image: $ docker build -t job-wq-2 . ``` +### Push the image + For the [Docker Hub](https://hub.docker.com/), tag your app image with your username and push to the Hub with the below commands. Replace `` with your Hub username. @@ -194,6 +196,9 @@ docker tag job-wq-2 /job-wq-2 docker push /job-wq-2 ``` +You need to push to a public repository or [configure your cluster to be able to access +your private repository](../../../docs/user-guide/images.md). + If you are using [Google Container Registry](https://cloud.google.com/tools/container-registry/), tag your app image with your project ID, and push to GCR. Replace @@ -220,7 +225,6 @@ spec: selector: matchLabels: app: job-wq-2 - completions: 1 parallelism: 2 template: metadata: @@ -263,17 +267,19 @@ Now wait a bit, then check on the job. $ ./kubectl describe jobs/job-wq-2 Name: job-wq-2 Namespace: default -Image(s): gcr.io/causal-jigsaw-637/job-wq-2 +Image(s): gcr.io/exampleproject/job-wq-2 Selector: app in (job-wq-2) Parallelism: 2 -Completions: 1 +Completions: Unset +Start Time: Mon, 11 Jan 2016 17:07:59 -0800 Labels: app=job-wq-2 -Pods Statuses: 0 Running / 1 Succeeded / 0 Failed +Pods Statuses: 1 Running / 0 Succeeded / 0 Failed No volumes. Events: - FirstSeen LastSeen Count From SubobjectPath Reason Message - ───────── ──────── ───── ──── ───────────── ────── ─────── - 1m 1m 1 {job } SuccessfulCreate Created pod: job-wq-2-7r7b2 + FirstSeen LastSeen Count From SubobjectPath Type Reason Message + --------- -------- ----- ---- ------------- -------- ------ ------- + 33s 33s 1 {job-controller } Normal SuccessfulCreate Created pod: job-wq-2-lglf8 + $ kubectl logs pods/job-wq-2-7r7b2 Worker with sessionID: bbd72d0a-9e5c-4dd6-abf6-416cc267991f diff --git a/examples/job/work-queue-2/job.yaml b/examples/job/work-queue-2/job.yaml index c1abf856ffa..09a55ec4b61 100644 --- a/examples/job/work-queue-2/job.yaml +++ b/examples/job/work-queue-2/job.yaml @@ -6,7 +6,6 @@ spec: selector: matchLabels: app: job-wq-2 - completions: 1 parallelism: 2 template: metadata: diff --git a/pkg/apis/extensions/types.go b/pkg/apis/extensions/types.go index e5b735b4835..99fd07a2133 100644 --- a/pkg/apis/extensions/types.go +++ b/pkg/apis/extensions/types.go @@ -481,7 +481,7 @@ type JobSpec struct { Parallelism *int `json:"parallelism,omitempty"` // Completions specifies the desired number of successfully finished pods the - // job should be run with. Defaults to 1. + // job should be run with. When unset, any pod exiting signals the job to complete. Completions *int `json:"completions,omitempty"` // Optional duration in seconds relative to the startTime that the job may be active diff --git a/pkg/apis/extensions/v1beta1/defaults.go b/pkg/apis/extensions/v1beta1/defaults.go index 9e270e7b3b6..c931c61019e 100644 --- a/pkg/apis/extensions/v1beta1/defaults.go +++ b/pkg/apis/extensions/v1beta1/defaults.go @@ -120,12 +120,17 @@ func addDefaultingFuncs(scheme *runtime.Scheme) { obj.Labels = labels } } - if obj.Spec.Completions == nil { - completions := int32(1) - obj.Spec.Completions = &completions + // For a non-parallel job, you can leave both `.spec.completions` and + // `.spec.parallelism` unset. When both are unset, both are defaulted to 1. + if obj.Spec.Completions == nil && obj.Spec.Parallelism == nil { + obj.Spec.Completions = new(int32) + *obj.Spec.Completions = 1 + obj.Spec.Parallelism = new(int32) + *obj.Spec.Parallelism = 1 } if obj.Spec.Parallelism == nil { - obj.Spec.Parallelism = obj.Spec.Completions + obj.Spec.Parallelism = new(int32) + *obj.Spec.Parallelism = 1 } }, func(obj *HorizontalPodAutoscaler) { diff --git a/pkg/apis/extensions/v1beta1/defaults_test.go b/pkg/apis/extensions/v1beta1/defaults_test.go index 082fc2adb41..fb33de040f6 100644 --- a/pkg/apis/extensions/v1beta1/defaults_test.go +++ b/pkg/apis/extensions/v1beta1/defaults_test.go @@ -302,7 +302,124 @@ func TestSetDefaultDeployment(t *testing.T) { } } -func TestSetDefaultJob(t *testing.T) { +func TestSetDefaultJobParallelismAndCompletions(t *testing.T) { + tests := []struct { + original *Job + expected *Job + }{ + // both unspecified -> sets both to 1 + { + original: &Job{ + Spec: JobSpec{}, + }, + expected: &Job{ + Spec: JobSpec{ + Completions: newInt32(1), + Parallelism: newInt32(1), + }, + }, + }, + // WQ: Parallelism explicitly 0 and completions unset -> no change + { + original: &Job{ + Spec: JobSpec{ + Parallelism: newInt32(0), + }, + }, + expected: &Job{ + Spec: JobSpec{ + Parallelism: newInt32(0), + }, + }, + }, + // WQ: Parallelism explicitly 2 and completions unset -> no change + { + original: &Job{ + Spec: JobSpec{ + Parallelism: newInt32(2), + }, + }, + expected: &Job{ + Spec: JobSpec{ + Parallelism: newInt32(2), + }, + }, + }, + // Completions explicitly 2 and parallelism unset -> parallelism is defaulted + { + original: &Job{ + Spec: JobSpec{ + Completions: newInt32(2), + }, + }, + expected: &Job{ + Spec: JobSpec{ + Completions: newInt32(2), + Parallelism: newInt32(1), + }, + }, + }, + // Both set -> no change + { + original: &Job{ + Spec: JobSpec{ + Completions: newInt32(10), + Parallelism: newInt32(11), + }, + }, + expected: &Job{ + Spec: JobSpec{ + Completions: newInt32(10), + Parallelism: newInt32(11), + }, + }, + }, + // Both set, flipped -> no change + { + original: &Job{ + Spec: JobSpec{ + Completions: newInt32(11), + Parallelism: newInt32(10), + }, + }, + expected: &Job{ + Spec: JobSpec{ + Completions: newInt32(11), + Parallelism: newInt32(10), + }, + }, + }, + } + + for _, tc := range tests { + original := tc.original + expected := tc.expected + obj2 := roundTrip(t, runtime.Object(original)) + got, ok := obj2.(*Job) + if !ok { + t.Errorf("unexpected object: %v", got) + t.FailNow() + } + if (got.Spec.Completions == nil) != (expected.Spec.Completions == nil) { + t.Errorf("got different *completions than expected: %v %v", got.Spec.Completions, expected.Spec.Completions) + } + if got.Spec.Completions != nil && expected.Spec.Completions != nil { + if *got.Spec.Completions != *expected.Spec.Completions { + t.Errorf("got different completions than expected: %d %d", *got.Spec.Completions, *expected.Spec.Completions) + } + } + if (got.Spec.Parallelism == nil) != (expected.Spec.Parallelism == nil) { + t.Errorf("got different *Parallelism than expected: %v %v", got.Spec.Parallelism, expected.Spec.Parallelism) + } + if got.Spec.Parallelism != nil && expected.Spec.Parallelism != nil { + if *got.Spec.Parallelism != *expected.Spec.Parallelism { + t.Errorf("got different parallelism than expected: %d %d", *got.Spec.Parallelism, *expected.Spec.Parallelism) + } + } + } +} + +func TestSetDefaultJobSelector(t *testing.T) { expected := &Job{ Spec: JobSpec{ Selector: &LabelSelector{ @@ -331,28 +448,6 @@ func TestSetDefaultJob(t *testing.T) { }, }, }, - // selector from template labels, completions set explicitly, parallelism - default - { - Spec: JobSpec{ - Completions: newInt32(1), - Template: v1.PodTemplateSpec{ - ObjectMeta: v1.ObjectMeta{ - Labels: map[string]string{"job": "selector"}, - }, - }, - }, - }, - // selector from template labels, completions - default, parallelism set explicitly - { - Spec: JobSpec{ - Parallelism: newInt32(1), - Template: v1.PodTemplateSpec{ - ObjectMeta: v1.ObjectMeta{ - Labels: map[string]string{"job": "selector"}, - }, - }, - }, - }, } for _, original := range tests { @@ -362,12 +457,6 @@ func TestSetDefaultJob(t *testing.T) { t.Errorf("unexpected object: %v", got) t.FailNow() } - if *got.Spec.Completions != *expected.Spec.Completions { - t.Errorf("got different completions than expected: %d %d", *got.Spec.Completions, *expected.Spec.Completions) - } - if *got.Spec.Parallelism != *expected.Spec.Parallelism { - t.Errorf("got different parallelism than expected: %d %d", *got.Spec.Parallelism, *expected.Spec.Parallelism) - } if !reflect.DeepEqual(got.Spec.Selector, expected.Spec.Selector) { t.Errorf("got different selectors %#v %#v", got.Spec.Selector, expected.Spec.Selector) } diff --git a/pkg/apis/extensions/v1beta1/types.go b/pkg/apis/extensions/v1beta1/types.go index 3c0e4206ead..3e04c58567f 100644 --- a/pkg/apis/extensions/v1beta1/types.go +++ b/pkg/apis/extensions/v1beta1/types.go @@ -473,8 +473,10 @@ type JobSpec struct { Parallelism *int32 `json:"parallelism,omitempty"` // Completions specifies the desired number of successfully finished pods the - // job should be run with. Defaults to 1. - // More info: http://releases.k8s.io/HEAD/docs/user-guide/jobs.md + // job should be run with. Setting to nil means that the success of any + // pod signals the success of all pods, and allows parallelism to have any positive + // value. Setting to 1 means that parallelism is limited to 1 and the success of that + // pod signals the success of the job. Completions *int32 `json:"completions,omitempty"` // Optional duration in seconds relative to the startTime that the job may be active diff --git a/pkg/apis/extensions/v1beta1/types_swagger_doc_generated.go b/pkg/apis/extensions/v1beta1/types_swagger_doc_generated.go index 686c0f2b2bd..25f05733d9f 100644 --- a/pkg/apis/extensions/v1beta1/types_swagger_doc_generated.go +++ b/pkg/apis/extensions/v1beta1/types_swagger_doc_generated.go @@ -381,7 +381,7 @@ func (JobList) SwaggerDoc() map[string]string { var map_JobSpec = map[string]string{ "": "JobSpec describes how the job execution will look like.", "parallelism": "Parallelism specifies the maximum desired number of pods the job should run at any given time. The actual number of pods running in steady state will be less than this number when ((.spec.completions - .status.successful) < .spec.parallelism), i.e. when the work left to do is less than max parallelism. More info: http://releases.k8s.io/HEAD/docs/user-guide/jobs.md", - "completions": "Completions specifies the desired number of successfully finished pods the job should be run with. Defaults to 1. More info: http://releases.k8s.io/HEAD/docs/user-guide/jobs.md", + "completions": "Completions specifies the desired number of successfully finished pods the job should be run with. Setting to nil means that the success of any pod signals the success of all pods, and allows parallelism to have any positive value. Setting to 1 means that parallelism is limited to 1 and the success of that pod signals the success of the job.", "activeDeadlineSeconds": "Optional duration in seconds relative to the startTime that the job may be active before the system tries to terminate it; value must be positive integer", "selector": "Selector is a label query over pods that should match the pod count. More info: http://releases.k8s.io/HEAD/docs/user-guide/labels.md#label-selectors", "template": "Template is the object that describes the pod that will be created when executing a job. More info: http://releases.k8s.io/HEAD/docs/user-guide/jobs.md", diff --git a/pkg/client/unversioned/conditions.go b/pkg/client/unversioned/conditions.go index e2a27b6b1b1..34c67258810 100644 --- a/pkg/client/unversioned/conditions.go +++ b/pkg/client/unversioned/conditions.go @@ -115,9 +115,14 @@ func JobHasDesiredParallelism(c ExtensionsInterface, job *extensions.Job) wait.C if job.Spec.Parallelism != nil && job.Status.Active == *job.Spec.Parallelism { return true, nil } - // otherwise count successful - progress := *job.Spec.Completions - job.Status.Active - job.Status.Succeeded - return progress == 0, nil + if job.Spec.Completions == nil { + // A job without specified completions needs to wait for Active to reach Parallelism. + return false, nil + } else { + // otherwise count successful + progress := *job.Spec.Completions - job.Status.Active - job.Status.Succeeded + return progress == 0, nil + } } } diff --git a/pkg/controller/job/controller.go b/pkg/controller/job/controller.go index abb74ac6707..e5f909f309b 100644 --- a/pkg/controller/job/controller.go +++ b/pkg/controller/job/controller.go @@ -362,7 +362,33 @@ func (jm *JobController) syncJob(key string) error { active = jm.manageJob(activePods, succeeded, &job) } completions := succeeded - if completions == *job.Spec.Completions { + complete := false + if job.Spec.Completions == nil { + // This type of job is complete when any pod exits with success. + // Each pod is capable of + // determining whether or not the entire Job is done. Subsequent pods are + // not expected to fail, but if they do, the failure is ignored. Once any + // pod succeeds, the controller waits for remaining pods to finish, and + // then the job is complete. + if succeeded > 0 && active == 0 { + complete = true + } + } else { + // Job specifies a number of completions. This type of job signals + // success by having that number of successes. Since we do not + // start more pods than there are remaining completions, there should + // not be any remaining active pods once this count is reached. + if completions >= *job.Spec.Completions { + complete = true + if active > 0 { + jm.recorder.Event(&job, api.EventTypeWarning, "TooManyActivePods", "Too many active pods running after completion count reached") + } + if completions > *job.Spec.Completions { + jm.recorder.Event(&job, api.EventTypeWarning, "TooManySucceededPods", "Too many succeeded pods running after completion count reached") + } + } + } + if complete { job.Status.Conditions = append(job.Status.Conditions, newCondition(extensions.JobComplete, "", "")) now := unversioned.Now() job.Status.CompletionTime = &now @@ -453,15 +479,31 @@ func (jm *JobController) manageJob(activePods []*api.Pod, succeeded int, job *ex wait.Wait() } else if active < parallelism { - // how many executions are left to run - diff := *job.Spec.Completions - succeeded - // limit to parallelism and count active pods as well - if diff > parallelism { - diff = parallelism + wantActive := 0 + if job.Spec.Completions == nil { + // Job does not specify a number of completions. Therefore, number active + // should be equal to parallelism, unless the job has seen at least + // once success, in which leave whatever is running, running. + if succeeded > 0 { + wantActive = active + } else { + wantActive = parallelism + } + } else { + // Job specifies a specific number of completions. Therefore, number + // active should not ever exceed number of remaining completions. + wantActive = *job.Spec.Completions - succeeded + if wantActive > parallelism { + wantActive = parallelism + } + } + diff := wantActive - active + if diff < 0 { + glog.Errorf("More active than wanted: job %q, want %d, have %d", jobKey, wantActive, active) + diff = 0 } - diff -= active jm.expectations.ExpectCreations(jobKey, diff) - glog.V(4).Infof("Too few pods running job %q, need %d, creating %d", jobKey, parallelism, diff) + glog.V(4).Infof("Too few pods running job %q, need %d, creating %d", jobKey, wantActive, diff) active += diff wait := sync.WaitGroup{} diff --git a/pkg/controller/job/controller_test.go b/pkg/controller/job/controller_test.go index 528bac57dab..e519c0c37e3 100644 --- a/pkg/controller/job/controller_test.go +++ b/pkg/controller/job/controller_test.go @@ -36,14 +36,12 @@ import ( var alwaysReady = func() bool { return true } func newJob(parallelism, completions int) *extensions.Job { - return &extensions.Job{ + j := &extensions.Job{ ObjectMeta: api.ObjectMeta{ Name: "foobar", Namespace: api.NamespaceDefault, }, Spec: extensions.JobSpec{ - Parallelism: ¶llelism, - Completions: &completions, Selector: &extensions.LabelSelector{ MatchLabels: map[string]string{"foo": "bar"}, }, @@ -61,6 +59,19 @@ func newJob(parallelism, completions int) *extensions.Job { }, }, } + // Special case: -1 for either completions or parallelism means leave nil (negative is not allowed + // in practice by validation. + if completions >= 0 { + j.Spec.Completions = &completions + } else { + j.Spec.Completions = nil + } + if parallelism >= 0 { + j.Spec.Parallelism = ¶llelism + } else { + j.Spec.Parallelism = nil + } + return j } func getKey(job *extensions.Job, t *testing.T) string { @@ -114,16 +125,31 @@ func TestControllerSyncJob(t *testing.T) { nil, 0, 0, 0, 2, 0, 2, 0, 0, false, }, + "WQ job start": { + 2, -1, + nil, 0, 0, 0, + 2, 0, 2, 0, 0, false, + }, "correct # of pods": { 2, 5, nil, 2, 0, 0, 0, 0, 2, 0, 0, false, }, + "WQ job: correct # of pods": { + 2, -1, + nil, 2, 0, 0, + 0, 0, 2, 0, 0, false, + }, "too few active pods": { 2, 5, nil, 1, 1, 0, 1, 0, 2, 1, 0, false, }, + "too few active pods with a dynamic job": { + 2, -1, + nil, 1, 0, 0, + 1, 0, 2, 0, 0, false, + }, "too few active pods, with controller error": { 2, 5, fmt.Errorf("Fake error"), 1, 1, 0, @@ -149,6 +175,21 @@ func TestControllerSyncJob(t *testing.T) { nil, 0, 5, 0, 0, 0, 0, 5, 0, true, }, + "WQ job finishing": { + 2, -1, + nil, 1, 1, 0, + 0, 0, 1, 1, 0, false, + }, + "WQ job all finished": { + 2, -1, + nil, 0, 2, 0, + 0, 0, 0, 2, 0, true, + }, + "WQ job all finished despite one failure": { + 2, -1, + nil, 0, 1, 1, + 0, 0, 0, 1, 1, true, + }, "more active pods than completions": { 2, 5, nil, 10, 0, 0, diff --git a/pkg/kubectl/describe.go b/pkg/kubectl/describe.go index c3a8f439e75..cc78f874ab5 100644 --- a/pkg/kubectl/describe.go +++ b/pkg/kubectl/describe.go @@ -908,7 +908,11 @@ func describeJob(job *extensions.Job, events *api.EventList) (string, error) { selector, _ := extensions.LabelSelectorAsSelector(job.Spec.Selector) fmt.Fprintf(out, "Selector:\t%s\n", selector) fmt.Fprintf(out, "Parallelism:\t%d\n", *job.Spec.Parallelism) - fmt.Fprintf(out, "Completions:\t%d\n", *job.Spec.Completions) + if job.Spec.Completions != nil { + fmt.Fprintf(out, "Completions:\t%d\n", *job.Spec.Completions) + } else { + fmt.Fprintf(out, "Completions:\tNot Set\n") + } if job.Status.StartTime != nil { fmt.Fprintf(out, "Start Time:\t%s\n", job.Status.StartTime.Time.Format(time.RFC1123Z)) }