mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-02 00:07:50 +00:00
Merge pull request #112948 from mimowo/112873-fix-job-finished-metric
Fix the job finished metric issue due to the final job status update occasionally failing
This commit is contained in:
commit
9bedff1147
@ -699,10 +699,7 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (forget bool, rEr
|
|||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
completionMode := string(batch.NonIndexedCompletion)
|
completionMode := getCompletionMode(&job)
|
||||||
if isIndexedJob(&job) {
|
|
||||||
completionMode = string(batch.IndexedCompletion)
|
|
||||||
}
|
|
||||||
action := metrics.JobSyncActionReconciling
|
action := metrics.JobSyncActionReconciling
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
@ -906,11 +903,14 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (forget bool, rEr
|
|||||||
job.Status.CompletedIndexes = succeededIndexes.String()
|
job.Status.CompletedIndexes = succeededIndexes.String()
|
||||||
}
|
}
|
||||||
job.Status.UncountedTerminatedPods = nil
|
job.Status.UncountedTerminatedPods = nil
|
||||||
jm.enactJobFinished(&job, finishedCondition)
|
jobFinished := jm.enactJobFinished(&job, finishedCondition)
|
||||||
|
|
||||||
if _, err := jm.updateStatusHandler(ctx, &job); err != nil {
|
if _, err := jm.updateStatusHandler(ctx, &job); err != nil {
|
||||||
return forget, err
|
return forget, err
|
||||||
}
|
}
|
||||||
|
if jobFinished {
|
||||||
|
jm.recordJobFinished(&job, finishedCondition)
|
||||||
|
}
|
||||||
|
|
||||||
if jobHasNewFailure && !IsJobFinished(&job) {
|
if jobHasNewFailure && !IsJobFinished(&job) {
|
||||||
// returning an error will re-enqueue Job after the backoff period
|
// returning an error will re-enqueue Job after the backoff period
|
||||||
@ -1105,13 +1105,17 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(ctx context.Context, job
|
|||||||
if job, needsFlush, err = jm.flushUncountedAndRemoveFinalizers(ctx, job, podsToRemoveFinalizer, uidsWithFinalizer, &oldCounters, needsFlush); err != nil {
|
if job, needsFlush, err = jm.flushUncountedAndRemoveFinalizers(ctx, job, podsToRemoveFinalizer, uidsWithFinalizer, &oldCounters, needsFlush); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if jm.enactJobFinished(job, finishedCond) {
|
jobFinished := jm.enactJobFinished(job, finishedCond)
|
||||||
|
if jobFinished {
|
||||||
needsFlush = true
|
needsFlush = true
|
||||||
}
|
}
|
||||||
if needsFlush {
|
if needsFlush {
|
||||||
if _, err := jm.updateStatusHandler(ctx, job); err != nil {
|
if _, err := jm.updateStatusHandler(ctx, job); err != nil {
|
||||||
return fmt.Errorf("removing uncounted pods from status: %w", err)
|
return fmt.Errorf("removing uncounted pods from status: %w", err)
|
||||||
}
|
}
|
||||||
|
if jobFinished {
|
||||||
|
jm.recordJobFinished(job, finishedCond)
|
||||||
|
}
|
||||||
recordJobPodFinished(job, oldCounters)
|
recordJobPodFinished(job, oldCounters)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
@ -1244,16 +1248,20 @@ func (jm *Controller) enactJobFinished(job *batch.Job, finishedCond *batch.JobCo
|
|||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
completionMode := string(batch.NonIndexedCompletion)
|
|
||||||
if isIndexedJob(job) {
|
|
||||||
completionMode = string(*job.Spec.CompletionMode)
|
|
||||||
}
|
|
||||||
job.Status.Conditions, _ = ensureJobConditionStatus(job.Status.Conditions, finishedCond.Type, finishedCond.Status, finishedCond.Reason, finishedCond.Message)
|
job.Status.Conditions, _ = ensureJobConditionStatus(job.Status.Conditions, finishedCond.Type, finishedCond.Status, finishedCond.Reason, finishedCond.Message)
|
||||||
|
if finishedCond.Type == batch.JobComplete {
|
||||||
|
job.Status.CompletionTime = &finishedCond.LastTransitionTime
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
// recordJobFinished records events and the job_finished_total metric for a finished job.
|
||||||
|
func (jm *Controller) recordJobFinished(job *batch.Job, finishedCond *batch.JobCondition) bool {
|
||||||
|
completionMode := getCompletionMode(job)
|
||||||
if finishedCond.Type == batch.JobComplete {
|
if finishedCond.Type == batch.JobComplete {
|
||||||
if job.Spec.Completions != nil && job.Status.Succeeded > *job.Spec.Completions {
|
if job.Spec.Completions != nil && job.Status.Succeeded > *job.Spec.Completions {
|
||||||
jm.recorder.Event(job, v1.EventTypeWarning, "TooManySucceededPods", "Too many succeeded pods running after completion count reached")
|
jm.recorder.Event(job, v1.EventTypeWarning, "TooManySucceededPods", "Too many succeeded pods running after completion count reached")
|
||||||
}
|
}
|
||||||
job.Status.CompletionTime = &finishedCond.LastTransitionTime
|
|
||||||
jm.recorder.Event(job, v1.EventTypeNormal, "Completed", "Job completed")
|
jm.recorder.Event(job, v1.EventTypeNormal, "Completed", "Job completed")
|
||||||
metrics.JobFinishedNum.WithLabelValues(completionMode, "succeeded").Inc()
|
metrics.JobFinishedNum.WithLabelValues(completionMode, "succeeded").Inc()
|
||||||
} else {
|
} else {
|
||||||
@ -1613,6 +1621,14 @@ func countValidPodsWithFilter(job *batch.Job, pods []*v1.Pod, uncounted sets.Str
|
|||||||
return result
|
return result
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// getCompletionMode returns string representation of the completion mode. Used as a label value for metrics.
|
||||||
|
func getCompletionMode(job *batch.Job) string {
|
||||||
|
if isIndexedJob(job) {
|
||||||
|
return string(batch.IndexedCompletion)
|
||||||
|
}
|
||||||
|
return string(batch.NonIndexedCompletion)
|
||||||
|
}
|
||||||
|
|
||||||
func trackingUncountedPods(job *batch.Job) bool {
|
func trackingUncountedPods(job *batch.Job) bool {
|
||||||
return feature.DefaultFeatureGate.Enabled(features.JobTrackingWithFinalizers) && hasJobTrackingAnnotation(job)
|
return feature.DefaultFeatureGate.Enabled(features.JobTrackingWithFinalizers) && hasJobTrackingAnnotation(job)
|
||||||
}
|
}
|
||||||
|
@ -47,12 +47,15 @@ import (
|
|||||||
"k8s.io/client-go/restmapper"
|
"k8s.io/client-go/restmapper"
|
||||||
"k8s.io/client-go/util/retry"
|
"k8s.io/client-go/util/retry"
|
||||||
featuregatetesting "k8s.io/component-base/featuregate/testing"
|
featuregatetesting "k8s.io/component-base/featuregate/testing"
|
||||||
|
basemetrics "k8s.io/component-base/metrics"
|
||||||
|
"k8s.io/component-base/metrics/testutil"
|
||||||
"k8s.io/controller-manager/pkg/informerfactory"
|
"k8s.io/controller-manager/pkg/informerfactory"
|
||||||
"k8s.io/klog/v2"
|
"k8s.io/klog/v2"
|
||||||
kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
|
kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
|
||||||
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
|
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
|
||||||
"k8s.io/kubernetes/pkg/controller/garbagecollector"
|
"k8s.io/kubernetes/pkg/controller/garbagecollector"
|
||||||
jobcontroller "k8s.io/kubernetes/pkg/controller/job"
|
jobcontroller "k8s.io/kubernetes/pkg/controller/job"
|
||||||
|
"k8s.io/kubernetes/pkg/controller/job/metrics"
|
||||||
"k8s.io/kubernetes/pkg/features"
|
"k8s.io/kubernetes/pkg/features"
|
||||||
"k8s.io/kubernetes/test/integration/framework"
|
"k8s.io/kubernetes/test/integration/framework"
|
||||||
"k8s.io/utils/pointer"
|
"k8s.io/utils/pointer"
|
||||||
@ -60,6 +63,138 @@ import (
|
|||||||
|
|
||||||
const waitInterval = time.Second
|
const waitInterval = time.Second
|
||||||
|
|
||||||
|
type metricLabelsWithValue struct {
|
||||||
|
Labels []string
|
||||||
|
Value int
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMetrics(t *testing.T) {
|
||||||
|
nonIndexedCompletion := batchv1.NonIndexedCompletion
|
||||||
|
indexedCompletion := batchv1.IndexedCompletion
|
||||||
|
wFinalizers := true
|
||||||
|
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, wFinalizers)()
|
||||||
|
|
||||||
|
// setup the job controller
|
||||||
|
closeFn, restConfig, clientSet, ns := setup(t, "simple")
|
||||||
|
defer closeFn()
|
||||||
|
ctx, cancel := startJobControllerAndWaitForCaches(restConfig)
|
||||||
|
defer func() {
|
||||||
|
cancel()
|
||||||
|
}()
|
||||||
|
|
||||||
|
testCases := map[string]struct {
|
||||||
|
job *batchv1.Job
|
||||||
|
wantJobFinishedNumMetricDelta metricLabelsWithValue
|
||||||
|
wantJobPodsFinishedMetricDelta metricLabelsWithValue
|
||||||
|
}{
|
||||||
|
"non-indexed job": {
|
||||||
|
job: &batchv1.Job{
|
||||||
|
Spec: batchv1.JobSpec{
|
||||||
|
Completions: pointer.Int32(2),
|
||||||
|
Parallelism: pointer.Int32(2),
|
||||||
|
CompletionMode: &nonIndexedCompletion,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
wantJobFinishedNumMetricDelta: metricLabelsWithValue{
|
||||||
|
Labels: []string{"NonIndexed", "succeeded"},
|
||||||
|
Value: 1,
|
||||||
|
},
|
||||||
|
wantJobPodsFinishedMetricDelta: metricLabelsWithValue{
|
||||||
|
Labels: []string{"NonIndexed", "succeeded"},
|
||||||
|
Value: 2,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"indexed job": {
|
||||||
|
job: &batchv1.Job{
|
||||||
|
Spec: batchv1.JobSpec{
|
||||||
|
Completions: pointer.Int32(2),
|
||||||
|
Parallelism: pointer.Int32(2),
|
||||||
|
CompletionMode: &indexedCompletion,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
wantJobFinishedNumMetricDelta: metricLabelsWithValue{
|
||||||
|
Labels: []string{"Indexed", "succeeded"},
|
||||||
|
Value: 1,
|
||||||
|
},
|
||||||
|
wantJobPodsFinishedMetricDelta: metricLabelsWithValue{
|
||||||
|
Labels: []string{"Indexed", "succeeded"},
|
||||||
|
Value: 2,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
job_index := 0 // job index to avoid collisions between job names created by different test cases
|
||||||
|
for name, tc := range testCases {
|
||||||
|
t.Run(name, func(t *testing.T) {
|
||||||
|
|
||||||
|
// record the metrics after the job is created
|
||||||
|
jobFinishedNumBefore, err := getCounterMetricValueForLabels(metrics.JobFinishedNum, tc.wantJobFinishedNumMetricDelta.Labels)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to collect the JobFinishedNum metric before creating the job: %q", err)
|
||||||
|
}
|
||||||
|
jobPodsFinishedBefore, err := getCounterMetricValueForLabels(metrics.JobPodsFinished, tc.wantJobPodsFinishedMetricDelta.Labels)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to collect the JobPodsFinished metric before creating the job: %q", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// create a single job and wait for its completion
|
||||||
|
job := tc.job.DeepCopy()
|
||||||
|
job.Name = fmt.Sprintf("job-%v", job_index)
|
||||||
|
job_index++
|
||||||
|
jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, job)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to create Job: %v", err)
|
||||||
|
}
|
||||||
|
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
|
||||||
|
Active: int(*jobObj.Spec.Parallelism),
|
||||||
|
Ready: pointer.Int32(0),
|
||||||
|
}, wFinalizers)
|
||||||
|
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, int(*jobObj.Spec.Parallelism)); err != nil {
|
||||||
|
t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodSucceeded, err)
|
||||||
|
}
|
||||||
|
validateJobSucceeded(ctx, t, clientSet, jobObj)
|
||||||
|
|
||||||
|
// verify metric values after the job is finished
|
||||||
|
validateMetricValueDeltas(t, metrics.JobFinishedNum, tc.wantJobFinishedNumMetricDelta, jobFinishedNumBefore)
|
||||||
|
validateMetricValueDeltas(t, metrics.JobPodsFinished, tc.wantJobPodsFinishedMetricDelta, jobPodsFinishedBefore)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func validateMetricValueDeltas(t *testing.T, counterVer *basemetrics.CounterVec, wantMetricDelta metricLabelsWithValue, metricValuesBefore metricLabelsWithValue) {
|
||||||
|
t.Helper()
|
||||||
|
var cmpErr error
|
||||||
|
err := wait.PollImmediate(10*time.Millisecond, 10*time.Second, func() (bool, error) {
|
||||||
|
cmpErr = nil
|
||||||
|
metricValuesAfter, err := getCounterMetricValueForLabels(counterVer, wantMetricDelta.Labels)
|
||||||
|
if err != nil {
|
||||||
|
return true, fmt.Errorf("Failed to collect the %q metric after the job is finished: %q", counterVer.Name, err)
|
||||||
|
}
|
||||||
|
wantDelta := wantMetricDelta.Value
|
||||||
|
gotDelta := metricValuesAfter.Value - metricValuesBefore.Value
|
||||||
|
if wantDelta != gotDelta {
|
||||||
|
cmpErr = fmt.Errorf("Unexepected metric delta for %q metric with labels %q. want: %v, got: %v", counterVer.Name, wantMetricDelta.Labels, wantDelta, gotDelta)
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
return true, nil
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("Failed waiting for expected metric delta: %q", err)
|
||||||
|
}
|
||||||
|
if cmpErr != nil {
|
||||||
|
t.Error(cmpErr)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func getCounterMetricValueForLabels(counterVec *basemetrics.CounterVec, labels []string) (metricLabelsWithValue, error) {
|
||||||
|
var result metricLabelsWithValue = metricLabelsWithValue{Labels: labels}
|
||||||
|
value, err := testutil.GetCounterMetricValue(counterVec.WithLabelValues(labels...))
|
||||||
|
if err != nil {
|
||||||
|
return result, err
|
||||||
|
}
|
||||||
|
result.Value = int(value)
|
||||||
|
return result, nil
|
||||||
|
}
|
||||||
|
|
||||||
// TestJobPodFailurePolicyWithFailedPodDeletedDuringControllerRestart verifies that the job is properly marked as Failed
|
// TestJobPodFailurePolicyWithFailedPodDeletedDuringControllerRestart verifies that the job is properly marked as Failed
|
||||||
// in a scenario when the job controller crashes between removing pod finalizers and marking the job as Failed (based on
|
// in a scenario when the job controller crashes between removing pod finalizers and marking the job as Failed (based on
|
||||||
// the pod failure policy). After the finalizer for the failed pod is removed we remove the failed pod. This step is
|
// the pod failure policy). After the finalizer for the failed pod is removed we remove the failed pod. This step is
|
||||||
|
Loading…
Reference in New Issue
Block a user