diff --git a/api/openapi-spec/swagger.json b/api/openapi-spec/swagger.json index 5fb579b192c..b29164acefd 100644 --- a/api/openapi-spec/swagger.json +++ b/api/openapi-spec/swagger.json @@ -3610,7 +3610,7 @@ "description": "A label query over pods that should match the pod count. Normally, the system sets this field for you. More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#label-selectors" }, "suspend": { - "description": "Suspend specifies whether the Job controller should create Pods or not. If a Job is created with suspend set to true, no Pods are created by the Job controller. If a Job is suspended after creation (i.e. the flag goes from false to true), the Job controller will delete all active Pods associated with this Job. Users must design their workload to gracefully handle this. Suspending a Job will reset the StartTime field of the Job, effectively resetting the ActiveDeadlineSeconds timer too. This is an alpha field and requires the SuspendJob feature gate to be enabled; otherwise this field may not be set to true. Defaults to false.", + "description": "Suspend specifies whether the Job controller should create Pods or not. If a Job is created with suspend set to true, no Pods are created by the Job controller. If a Job is suspended after creation (i.e. the flag goes from false to true), the Job controller will delete all active Pods associated with this Job. Users must design their workload to gracefully handle this. Suspending a Job will reset the StartTime field of the Job, effectively resetting the ActiveDeadlineSeconds timer too. Defaults to false.\n\nThis field is beta-level, gated by SuspendJob feature flag (enabled by default).", "type": "boolean" }, "template": { diff --git a/pkg/apis/batch/types.go b/pkg/apis/batch/types.go index 77d5b6f281a..83022e72351 100644 --- a/pkg/apis/batch/types.go +++ b/pkg/apis/batch/types.go @@ -199,9 +199,11 @@ type JobSpec struct { // false to true), the Job controller will delete all active Pods associated // with this Job. Users must design their workload to gracefully handle this. // Suspending a Job will reset the StartTime field of the Job, effectively - // resetting the ActiveDeadlineSeconds timer too. This is an alpha field and - // requires the SuspendJob feature gate to be enabled; otherwise this field - // may not be set to true. Defaults to false. + // resetting the ActiveDeadlineSeconds timer too. Defaults to false. + // + // This field is beta-level, gated by SuspendJob feature flag (enabled by + // default). + // // +optional Suspend *bool } diff --git a/pkg/controller/job/job_controller.go b/pkg/controller/job/job_controller.go index 5604f7c8eb6..a69e1410cf3 100644 --- a/pkg/controller/job/job_controller.go +++ b/pkg/controller/job/job_controller.go @@ -488,6 +488,7 @@ func (jm *Controller) syncJob(key string) (forget bool, rErr error) { if isIndexedJob(&job) { completionMode = string(batch.IndexedCompletion) } + action := metrics.JobSyncActionReconciling defer func() { result := "success" @@ -495,8 +496,8 @@ func (jm *Controller) syncJob(key string) (forget bool, rErr error) { result = "error" } - metrics.JobSyncDurationSeconds.WithLabelValues(completionMode, result).Observe(time.Since(startTime).Seconds()) - metrics.JobSyncNum.WithLabelValues(completionMode, result).Inc() + metrics.JobSyncDurationSeconds.WithLabelValues(completionMode, result, action).Observe(time.Since(startTime).Seconds()) + metrics.JobSyncNum.WithLabelValues(completionMode, result, action).Inc() }() // Check the expectations of the job before counting active pods, otherwise a new pod can sneak in @@ -568,7 +569,7 @@ func (jm *Controller) syncJob(key string) (forget bool, rErr error) { metrics.JobFinishedNum.WithLabelValues(completionMode, "failed").Inc() } else { if jobNeedsSync && job.DeletionTimestamp == nil { - active, manageJobErr = jm.manageJob(&job, activePods, succeeded, pods) + active, action, manageJobErr = jm.manageJob(&job, activePods, succeeded, pods) manageJobCalled = true } completions := succeeded @@ -762,13 +763,13 @@ func jobSuspended(job *batch.Job) bool { // manageJob is the core method responsible for managing the number of running // pods according to what is specified in the job.Spec. // Does NOT modify . -func (jm *Controller) manageJob(job *batch.Job, activePods []*v1.Pod, succeeded int32, allPods []*v1.Pod) (int32, error) { +func (jm *Controller) manageJob(job *batch.Job, activePods []*v1.Pod, succeeded int32, allPods []*v1.Pod) (int32, string, error) { active := int32(len(activePods)) parallelism := *job.Spec.Parallelism jobKey, err := controller.KeyFunc(job) if err != nil { utilruntime.HandleError(fmt.Errorf("Couldn't get key for job %#v: %v", job, err)) - return 0, nil + return 0, metrics.JobSyncActionTracking, nil } if jobSuspended(job) { @@ -777,7 +778,7 @@ func (jm *Controller) manageJob(job *batch.Job, activePods []*v1.Pod, succeeded jm.expectations.ExpectDeletions(jobKey, len(podsToDelete)) removed, err := jm.deleteJobPods(job, jobKey, podsToDelete) active -= removed - return active, err + return active, metrics.JobSyncActionPodsDeleted, err } wantActive := int32(0) @@ -812,21 +813,15 @@ func (jm *Controller) manageJob(job *batch.Job, activePods []*v1.Pod, succeeded klog.V(4).InfoS("Too many pods running for job", "job", klog.KObj(job), "deleted", len(podsToDelete), "target", parallelism) removed, err := jm.deleteJobPods(job, jobKey, podsToDelete) active -= removed - if err != nil { - return active, err - } + // While it is possible for a Job to require both pod creations and + // deletions at the same time (e.g. indexed Jobs with repeated indexes), we + // restrict ourselves to either just pod deletion or pod creation in any + // given sync cycle. Of these two, pod deletion takes precedence. + return active, metrics.JobSyncActionPodsDeleted, err } if active < wantActive { diff := wantActive - active - if diff < 0 { - utilruntime.HandleError(fmt.Errorf("More active than wanted: job %q, want %d, have %d", jobKey, wantActive, active)) - diff = 0 - } - if diff == 0 { - return active, nil - } - if diff > int32(maxPodCreateDeletePerSync) { diff = int32(maxPodCreateDeletePerSync) } @@ -909,12 +904,10 @@ func (jm *Controller) manageJob(job *batch.Job, activePods []*v1.Pod, succeeded } diff -= batchSize } - if err := errorFromChannel(errCh); err != nil { - return active, err - } + return active, metrics.JobSyncActionPodsCreated, errorFromChannel(errCh) } - return active, nil + return active, metrics.JobSyncActionTracking, nil } // activePodsForRemoval returns Pods that should be removed because there diff --git a/pkg/controller/job/job_controller_test.go b/pkg/controller/job/job_controller_test.go index 156a3972d19..e1962a47522 100644 --- a/pkg/controller/job/job_controller_test.go +++ b/pkg/controller/job/job_controller_test.go @@ -555,14 +555,13 @@ func TestControllerSyncJob(t *testing.T) { {"2", v1.PodRunning}, {"2", v1.PodPending}, }, - jobKeyForget: true, - expectedCreations: 2, - expectedDeletions: 2, - expectedActive: 4, - expectedSucceeded: 1, - expectedCompletedIdxs: "0", - expectedCreatedIndexes: sets.NewInt(3, 4), - indexedJobEnabled: true, + jobKeyForget: true, + expectedCreations: 0, + expectedDeletions: 2, + expectedActive: 2, + expectedSucceeded: 1, + expectedCompletedIdxs: "0", + indexedJobEnabled: true, }, "indexed job with indexes outside of range": { parallelism: 2, @@ -576,15 +575,14 @@ func TestControllerSyncJob(t *testing.T) { {"7", v1.PodPending}, {"8", v1.PodFailed}, }, - jobKeyForget: true, - expectedCreations: 2, - expectedSucceeded: 1, - expectedDeletions: 2, - expectedCompletedIdxs: "0", - expectedCreatedIndexes: sets.NewInt(1, 2), - expectedActive: 2, - expectedFailed: 0, - indexedJobEnabled: true, + jobKeyForget: true, + expectedCreations: 0, // only one of creations and deletions can happen in a sync + expectedSucceeded: 1, + expectedDeletions: 2, + expectedCompletedIdxs: "0", + expectedActive: 0, + expectedFailed: 0, + indexedJobEnabled: true, }, "indexed job feature disabled": { parallelism: 2, diff --git a/pkg/controller/job/metrics/metrics.go b/pkg/controller/job/metrics/metrics.go index 5dbe4c97746..b2517a09f42 100644 --- a/pkg/controller/job/metrics/metrics.go +++ b/pkg/controller/job/metrics/metrics.go @@ -27,8 +27,11 @@ import ( const JobControllerSubsystem = "job_controller" var ( - // JobSyncDurationSeconds tracks the latency of job syncs as - // completion_mode = Indexed / NonIndexed and result = success / error. + // JobSyncDurationSeconds tracks the latency of Job syncs. Possible label + // values: + // completion_mode: Indexed, NonIndexed + // result: success, error + // action: reconciling, tracking, pods_created, pods_deleted JobSyncDurationSeconds = metrics.NewHistogramVec( &metrics.HistogramOpts{ Subsystem: JobControllerSubsystem, @@ -37,10 +40,12 @@ var ( StabilityLevel: metrics.ALPHA, Buckets: metrics.ExponentialBuckets(0.001, 2, 15), }, - []string{"completion_mode", "result"}, + []string{"completion_mode", "result", "action"}, ) - // JobSyncNum tracks the number of job syncs as - // completion_mode = Indexed / NonIndexed and result = success / error. + // JobSyncNum tracks the number of Job syncs. Possible label values: + // completion_mode: Indexed, NonIndexed + // result: success, error + // action: reconciling, tracking, pods_created, pods_deleted JobSyncNum = metrics.NewCounterVec( &metrics.CounterOpts{ Subsystem: JobControllerSubsystem, @@ -48,10 +53,12 @@ var ( Help: "The number of job syncs", StabilityLevel: metrics.ALPHA, }, - []string{"completion_mode", "result"}, + []string{"completion_mode", "result", "action"}, ) - // JobFinishedNum tracks the number of jobs that finish as - // completion_mode = Indexed / NonIndexed and result = failed / succeeded. + // JobFinishedNum tracks the number of Jobs that finish. Possible label + // values: + // completion_mode: Indexed, NonIndexed + // result: failed, succeeded JobFinishedNum = metrics.NewCounterVec( &metrics.CounterOpts{ Subsystem: JobControllerSubsystem, @@ -63,6 +70,26 @@ var ( ) ) +// Possible values for the "action" label in the above metrics. +const ( + // JobSyncActionReconciling when the Job's pod creation/deletion expectations + // are unsatisfied and the controller is waiting for issued Pod + // creation/deletions to complete. + JobSyncActionReconciling = "reconciling" + // JobSyncActionTracking when the Job's pod creation/deletion expectations + // are satisfied and the number of active Pods matches expectations (i.e. no + // pod creation/deletions issued in this sync). This is expected to be the + // action in most of the syncs. + JobSyncActionTracking = "tracking" + // JobSyncActionPodsCreated when the controller creates Pods. This can happen + // when the number of active Pods is less than the wanted Job parallelism. + JobSyncActionPodsCreated = "pods_created" + // JobSyncActionPodsDeleted when the controller deletes Pods. This can happen + // if a Job is suspended or if the number of active Pods is more than + // parallelism. + JobSyncActionPodsDeleted = "pods_deleted" +) + var registerMetrics sync.Once // Register registers Job controller metrics. diff --git a/pkg/features/kube_features.go b/pkg/features/kube_features.go index 70692586094..e299fb022a4 100644 --- a/pkg/features/kube_features.go +++ b/pkg/features/kube_features.go @@ -678,6 +678,7 @@ const ( // owner: @adtac // alpha: v1.21 + // beta: v1.22 // // Allows jobs to be created in the suspended state. SuspendJob featuregate.Feature = "SuspendJob" @@ -820,7 +821,7 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS LogarithmicScaleDown: {Default: false, PreRelease: featuregate.Alpha}, IngressClassNamespacedParams: {Default: false, PreRelease: featuregate.Alpha}, ServiceInternalTrafficPolicy: {Default: false, PreRelease: featuregate.Alpha}, - SuspendJob: {Default: false, PreRelease: featuregate.Alpha}, + SuspendJob: {Default: true, PreRelease: featuregate.Beta}, KubeletPodResourcesGetAllocatable: {Default: false, PreRelease: featuregate.Alpha}, NamespaceDefaultLabelName: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.24 CSIVolumeHealth: {Default: false, PreRelease: featuregate.Alpha}, diff --git a/staging/src/k8s.io/api/batch/v1/generated.proto b/staging/src/k8s.io/api/batch/v1/generated.proto index c786d6ae25c..12092565ccb 100644 --- a/staging/src/k8s.io/api/batch/v1/generated.proto +++ b/staging/src/k8s.io/api/batch/v1/generated.proto @@ -262,9 +262,11 @@ message JobSpec { // false to true), the Job controller will delete all active Pods associated // with this Job. Users must design their workload to gracefully handle this. // Suspending a Job will reset the StartTime field of the Job, effectively - // resetting the ActiveDeadlineSeconds timer too. This is an alpha field and - // requires the SuspendJob feature gate to be enabled; otherwise this field - // may not be set to true. Defaults to false. + // resetting the ActiveDeadlineSeconds timer too. Defaults to false. + // + // This field is beta-level, gated by SuspendJob feature flag (enabled by + // default). + // // +optional optional bool suspend = 10; } diff --git a/staging/src/k8s.io/api/batch/v1/types.go b/staging/src/k8s.io/api/batch/v1/types.go index caf0374c174..1803fabfd84 100644 --- a/staging/src/k8s.io/api/batch/v1/types.go +++ b/staging/src/k8s.io/api/batch/v1/types.go @@ -178,9 +178,11 @@ type JobSpec struct { // false to true), the Job controller will delete all active Pods associated // with this Job. Users must design their workload to gracefully handle this. // Suspending a Job will reset the StartTime field of the Job, effectively - // resetting the ActiveDeadlineSeconds timer too. This is an alpha field and - // requires the SuspendJob feature gate to be enabled; otherwise this field - // may not be set to true. Defaults to false. + // resetting the ActiveDeadlineSeconds timer too. Defaults to false. + // + // This field is beta-level, gated by SuspendJob feature flag (enabled by + // default). + // // +optional Suspend *bool `json:"suspend,omitempty" protobuf:"varint,10,opt,name=suspend"` } diff --git a/staging/src/k8s.io/api/batch/v1/types_swagger_doc_generated.go b/staging/src/k8s.io/api/batch/v1/types_swagger_doc_generated.go index f522b01a117..da3ef6a24a2 100644 --- a/staging/src/k8s.io/api/batch/v1/types_swagger_doc_generated.go +++ b/staging/src/k8s.io/api/batch/v1/types_swagger_doc_generated.go @@ -120,7 +120,7 @@ var map_JobSpec = map[string]string{ "template": "Describes the pod that will be created when executing a job. More info: https://kubernetes.io/docs/concepts/workloads/controllers/jobs-run-to-completion/", "ttlSecondsAfterFinished": "ttlSecondsAfterFinished limits the lifetime of a Job that has finished execution (either Complete or Failed). If this field is set, ttlSecondsAfterFinished after the Job finishes, it is eligible to be automatically deleted. When the Job is being deleted, its lifecycle guarantees (e.g. finalizers) will be honored. If this field is unset, the Job won't be automatically deleted. If this field is set to zero, the Job becomes eligible to be deleted immediately after it finishes. This field is alpha-level and is only honored by servers that enable the TTLAfterFinished feature.", "completionMode": "CompletionMode specifies how Pod completions are tracked. It can be `NonIndexed` (default) or `Indexed`.\n\n`NonIndexed` means that the Job is considered complete when there have been .spec.completions successfully completed Pods. Each Pod completion is homologous to each other.\n\n`Indexed` means that the Pods of a Job get an associated completion index from 0 to (.spec.completions - 1), available in the annotation batch.kubernetes.io/job-completion-index. The Job is considered complete when there is one successfully completed Pod for each index. When value is `Indexed`, .spec.completions must be specified and `.spec.parallelism` must be less than or equal to 10^5. In addition, The Pod name takes the form `$(job-name)-$(index)-$(random-string)`, the Pod hostname takes the form `$(job-name)-$(index)`.\n\nThis field is beta-level. More completion modes can be added in the future. If the Job controller observes a mode that it doesn't recognize, the controller skips updates for the Job.", - "suspend": "Suspend specifies whether the Job controller should create Pods or not. If a Job is created with suspend set to true, no Pods are created by the Job controller. If a Job is suspended after creation (i.e. the flag goes from false to true), the Job controller will delete all active Pods associated with this Job. Users must design their workload to gracefully handle this. Suspending a Job will reset the StartTime field of the Job, effectively resetting the ActiveDeadlineSeconds timer too. This is an alpha field and requires the SuspendJob feature gate to be enabled; otherwise this field may not be set to true. Defaults to false.", + "suspend": "Suspend specifies whether the Job controller should create Pods or not. If a Job is created with suspend set to true, no Pods are created by the Job controller. If a Job is suspended after creation (i.e. the flag goes from false to true), the Job controller will delete all active Pods associated with this Job. Users must design their workload to gracefully handle this. Suspending a Job will reset the StartTime field of the Job, effectively resetting the ActiveDeadlineSeconds timer too. Defaults to false.\n\nThis field is beta-level, gated by SuspendJob feature flag (enabled by default).", } func (JobSpec) SwaggerDoc() map[string]string { diff --git a/test/e2e/apps/job.go b/test/e2e/apps/job.go index e676cab0b1d..469d0060ce9 100644 --- a/test/e2e/apps/job.go +++ b/test/e2e/apps/job.go @@ -70,10 +70,7 @@ var _ = SIGDescribe("Job", func() { framework.ExpectEqual(successes, completions, "epected %d successful job pods, but got %d", completions, successes) }) - // Requires the alpha level feature gate SuspendJob. This e2e test will not - // pass without the following flag being passed to kubetest: - // --test_args="--feature-gates=SuspendJob=true" - ginkgo.It("[Feature:SuspendJob] should not create pods when created in suspend state", func() { + ginkgo.It("should not create pods when created in suspend state", func() { ginkgo.By("Creating a job with suspend=true") job := e2ejob.NewTestJob("succeed", "suspend-true-to-false", v1.RestartPolicyNever, parallelism, completions, nil, backoffLimit) job.Spec.Suspend = pointer.BoolPtr(true) @@ -111,10 +108,7 @@ var _ = SIGDescribe("Job", func() { framework.ExpectNoError(err, "failed to ensure job completion in namespace: %s", f.Namespace.Name) }) - // Requires the alpha level feature gate SuspendJob. This e2e test will not - // pass without the following flag being passed to kubetest: - // --test_args="--feature-gates=SuspendJob=true" - ginkgo.It("[Feature:SuspendJob] should delete pods when suspended", func() { + ginkgo.It("should delete pods when suspended", func() { ginkgo.By("Creating a job with suspend=false") job := e2ejob.NewTestJob("notTerminate", "suspend-false-to-true", v1.RestartPolicyNever, parallelism, completions, nil, backoffLimit) job.Spec.Suspend = pointer.BoolPtr(false)