diff --git a/pkg/controller/job/job_controller.go b/pkg/controller/job/job_controller.go index 858f75d11a5..56c7c270f6c 100644 --- a/pkg/controller/job/job_controller.go +++ b/pkg/controller/job/job_controller.go @@ -47,6 +47,7 @@ import ( "k8s.io/component-base/metrics/prometheus/ratelimiter" "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/controller" + "k8s.io/kubernetes/pkg/controller/job/metrics" "k8s.io/kubernetes/pkg/features" "k8s.io/utils/integer" ) @@ -139,6 +140,8 @@ func NewController(podInformer coreinformers.PodInformer, jobInformer batchinfor jm.updateHandler = jm.updateJobStatus jm.syncHandler = jm.syncJob + metrics.Register() + return jm } @@ -440,7 +443,7 @@ func (jm *Controller) getPodsForJob(j *batch.Job) ([]*v1.Pod, error) { // syncJob will sync the job with the given key if it has had its expectations fulfilled, meaning // it did not expect to see any more of its pods created or deleted. This function is not meant to be invoked // concurrently with the same key. -func (jm *Controller) syncJob(key string) (bool, error) { +func (jm *Controller) syncJob(key string) (forget bool, rErr error) { startTime := time.Now() defer func() { klog.V(4).Infof("Finished syncing job %q (%v)", key, time.Since(startTime)) @@ -480,6 +483,21 @@ func (jm *Controller) syncJob(key string) (bool, error) { return false, nil } + completionMode := string(batch.NonIndexedCompletion) + if isIndexedJob(&job) { + completionMode = string(batch.IndexedCompletion) + } + + defer func() { + result := "success" + if rErr != nil { + result = "error" + } + + metrics.JobSyncDurationSeconds.WithLabelValues(completionMode, result).Observe(time.Since(startTime).Seconds()) + metrics.JobSyncNum.WithLabelValues(completionMode, result).Inc() + }() + // Check the expectations of the job before counting active pods, otherwise a new pod can sneak in // and update the expectations after we've retrieved active pods from the store. If a new pod enters // the store after we've checked the expectation, the job sync is just deferred till the next relist. @@ -546,6 +564,7 @@ func (jm *Controller) syncJob(key string) (bool, error) { job.Status.Conditions = append(job.Status.Conditions, newCondition(batch.JobFailed, v1.ConditionTrue, failureReason, failureMessage)) jobConditionsChanged = true jm.recorder.Event(&job, v1.EventTypeWarning, failureReason, failureMessage) + metrics.JobFinishedNum.WithLabelValues(completionMode, "failed").Inc() } else { if jobNeedsSync && job.DeletionTimestamp == nil { active, manageJobErr = jm.manageJob(&job, activePods, succeeded, pods) @@ -581,6 +600,7 @@ func (jm *Controller) syncJob(key string) (bool, error) { now := metav1.Now() job.Status.CompletionTime = &now jm.recorder.Event(&job, v1.EventTypeNormal, "Completed", "Job completed") + metrics.JobFinishedNum.WithLabelValues(completionMode, "succeeded").Inc() } else if utilfeature.DefaultFeatureGate.Enabled(features.SuspendJob) && manageJobCalled { // Update the conditions / emit events only if manageJob was called in // this syncJob. Otherwise wait for the right syncJob call to make @@ -613,7 +633,7 @@ func (jm *Controller) syncJob(key string) (bool, error) { } } - forget := false + forget = false // Check if the number of jobs succeeded increased since the last check. If yes "forget" should be true // This logic is linked to the issue: https://github.com/kubernetes/kubernetes/issues/56853 that aims to // improve the Job backoff policy when parallelism > 1 and few Jobs failed but others succeed. diff --git a/pkg/controller/job/metrics/metrics.go b/pkg/controller/job/metrics/metrics.go new file mode 100644 index 00000000000..5dbe4c97746 --- /dev/null +++ b/pkg/controller/job/metrics/metrics.go @@ -0,0 +1,75 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package metrics + +import ( + "sync" + + "k8s.io/component-base/metrics" + "k8s.io/component-base/metrics/legacyregistry" +) + +// JobControllerSubsystem - subsystem name used for this controller. +const JobControllerSubsystem = "job_controller" + +var ( + // JobSyncDurationSeconds tracks the latency of job syncs as + // completion_mode = Indexed / NonIndexed and result = success / error. + JobSyncDurationSeconds = metrics.NewHistogramVec( + &metrics.HistogramOpts{ + Subsystem: JobControllerSubsystem, + Name: "job_sync_duration_seconds", + Help: "The time it took to sync a job", + StabilityLevel: metrics.ALPHA, + Buckets: metrics.ExponentialBuckets(0.001, 2, 15), + }, + []string{"completion_mode", "result"}, + ) + // JobSyncNum tracks the number of job syncs as + // completion_mode = Indexed / NonIndexed and result = success / error. + JobSyncNum = metrics.NewCounterVec( + &metrics.CounterOpts{ + Subsystem: JobControllerSubsystem, + Name: "job_sync_total", + Help: "The number of job syncs", + StabilityLevel: metrics.ALPHA, + }, + []string{"completion_mode", "result"}, + ) + // JobFinishedNum tracks the number of jobs that finish as + // completion_mode = Indexed / NonIndexed and result = failed / succeeded. + JobFinishedNum = metrics.NewCounterVec( + &metrics.CounterOpts{ + Subsystem: JobControllerSubsystem, + Name: "job_finished_total", + Help: "The number of finished job", + StabilityLevel: metrics.ALPHA, + }, + []string{"completion_mode", "result"}, + ) +) + +var registerMetrics sync.Once + +// Register registers Job controller metrics. +func Register() { + registerMetrics.Do(func() { + legacyregistry.MustRegister(JobSyncDurationSeconds) + legacyregistry.MustRegister(JobSyncNum) + legacyregistry.MustRegister(JobFinishedNum) + }) +}