mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-21 10:51:29 +00:00
Revert "Revert "Add metric job_pod_finished""
This reverts commit 7868fbbe64
.
This commit is contained in:
parent
47a957d163
commit
a438f16741
@ -338,3 +338,10 @@ func (bci byCompletionIndex) Swap(i, j int) {
|
||||
func (bci byCompletionIndex) Len() int {
|
||||
return len(bci)
|
||||
}
|
||||
|
||||
func completionModeStr(job *batch.Job) string {
|
||||
if job.Spec.CompletionMode != nil {
|
||||
return string(*job.Spec.CompletionMode)
|
||||
}
|
||||
return string(batch.NonIndexedCompletion)
|
||||
}
|
||||
|
@ -900,6 +900,8 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(job *batch.Job, pods []*
|
||||
uidsWithFinalizer.Insert(string(p.UID))
|
||||
}
|
||||
}
|
||||
// Shallow copy, as it will only be used to detect changes in the counters.
|
||||
oldCounters := job.Status
|
||||
if cleanUncountedPodsWithoutFinalizers(&job.Status, uidsWithFinalizer) {
|
||||
needsFlush = true
|
||||
}
|
||||
@ -954,7 +956,7 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(job *batch.Job, pods []*
|
||||
job.Status.CompletedIndexes = succeededIndexes.String()
|
||||
}
|
||||
var err error
|
||||
if job, needsFlush, err = jm.flushUncountedAndRemoveFinalizers(job, podsToRemoveFinalizer, uidsWithFinalizer, needsFlush); err != nil {
|
||||
if job, needsFlush, err = jm.flushUncountedAndRemoveFinalizers(job, podsToRemoveFinalizer, uidsWithFinalizer, &oldCounters, needsFlush); err != nil {
|
||||
return err
|
||||
}
|
||||
if jm.enactJobFinished(job, finishedCond) {
|
||||
@ -964,6 +966,7 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(job *batch.Job, pods []*
|
||||
if _, err := jm.updateStatusHandler(job); err != nil {
|
||||
return fmt.Errorf("removing uncounted pods from status: %w", err)
|
||||
}
|
||||
recordJobPodFinished(job, oldCounters)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@ -977,12 +980,15 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(job *batch.Job, pods []*
|
||||
// 4. (if not all removals succeeded) flush Job status again.
|
||||
// Returns whether there are pending changes in the Job status that need to be
|
||||
// flushed in subsequent calls.
|
||||
func (jm *Controller) flushUncountedAndRemoveFinalizers(job *batch.Job, podsToRemoveFinalizer []*v1.Pod, uidsWithFinalizer sets.String, needsFlush bool) (*batch.Job, bool, error) {
|
||||
func (jm *Controller) flushUncountedAndRemoveFinalizers(job *batch.Job, podsToRemoveFinalizer []*v1.Pod, uidsWithFinalizer sets.String, oldCounters *batch.JobStatus, needsFlush bool) (*batch.Job, bool, error) {
|
||||
var err error
|
||||
if needsFlush {
|
||||
if job, err = jm.updateStatusHandler(job); err != nil {
|
||||
return job, needsFlush, fmt.Errorf("adding uncounted pods to status: %w", err)
|
||||
}
|
||||
recordJobPodFinished(job, *oldCounters)
|
||||
// Shallow copy.
|
||||
*oldCounters = job.Status
|
||||
needsFlush = false
|
||||
}
|
||||
var rmErr error
|
||||
@ -1536,3 +1542,11 @@ func ensureJobConditionStatus(list []batch.JobCondition, cType batch.JobConditio
|
||||
}
|
||||
return list, false
|
||||
}
|
||||
|
||||
func recordJobPodFinished(job *batch.Job, oldCounters batch.JobStatus) {
|
||||
completionMode := completionModeStr(job)
|
||||
diff := job.Status.Succeeded - oldCounters.Succeeded
|
||||
metrics.JobPodsFinished.WithLabelValues(completionMode, metrics.Succeeded).Add(float64(diff))
|
||||
diff = job.Status.Failed - oldCounters.Failed
|
||||
metrics.JobPodsFinished.WithLabelValues(completionMode, metrics.Failed).Add(float64(diff))
|
||||
}
|
||||
|
@ -47,8 +47,10 @@ import (
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
featuregatetesting "k8s.io/component-base/featuregate/testing"
|
||||
metricstestutil "k8s.io/component-base/metrics/testutil"
|
||||
_ "k8s.io/kubernetes/pkg/apis/core/install"
|
||||
"k8s.io/kubernetes/pkg/controller"
|
||||
"k8s.io/kubernetes/pkg/controller/job/metrics"
|
||||
"k8s.io/kubernetes/pkg/controller/testutil"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
"k8s.io/utils/pointer"
|
||||
@ -1515,19 +1517,20 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) {
|
||||
clientSet := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
|
||||
manager, _ := newControllerFromClient(clientSet, controller.NoResyncPeriodFunc)
|
||||
fakePodControl := controller.FakePodControl{Err: tc.podControlErr}
|
||||
metrics.JobPodsFinished.Reset()
|
||||
manager.podControl = &fakePodControl
|
||||
var statusUpdates []batch.JobStatus
|
||||
manager.updateStatusHandler = func(job *batch.Job) (*batch.Job, error) {
|
||||
statusUpdates = append(statusUpdates, *job.Status.DeepCopy())
|
||||
return job, tc.statusUpdateErr
|
||||
}
|
||||
|
||||
if tc.job.Status.UncountedTerminatedPods == nil {
|
||||
tc.job.Status.UncountedTerminatedPods = &batch.UncountedTerminatedPods{}
|
||||
job := tc.job.DeepCopy()
|
||||
if job.Status.UncountedTerminatedPods == nil {
|
||||
job.Status.UncountedTerminatedPods = &batch.UncountedTerminatedPods{}
|
||||
}
|
||||
uncounted := newUncountedTerminatedPods(*tc.job.Status.UncountedTerminatedPods)
|
||||
succeededIndexes := succeededIndexesFromJob(&tc.job)
|
||||
err := manager.trackJobStatusAndRemoveFinalizers(&tc.job, tc.pods, succeededIndexes, *uncounted, tc.finishedCond, tc.needsFlush)
|
||||
uncounted := newUncountedTerminatedPods(*job.Status.UncountedTerminatedPods)
|
||||
succeededIndexes := succeededIndexesFromJob(job)
|
||||
err := manager.trackJobStatusAndRemoveFinalizers(job, tc.pods, succeededIndexes, *uncounted, tc.finishedCond, tc.needsFlush)
|
||||
if !errors.Is(err, tc.wantErr) {
|
||||
t.Errorf("Got error %v, want %v", err, tc.wantErr)
|
||||
}
|
||||
@ -1538,6 +1541,25 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) {
|
||||
if rmFinalizers != tc.wantRmFinalizers {
|
||||
t.Errorf("Removed %d finalizers, want %d", rmFinalizers, tc.wantRmFinalizers)
|
||||
}
|
||||
if tc.wantErr == nil {
|
||||
completionMode := completionModeStr(job)
|
||||
v, err := metricstestutil.GetCounterMetricValue(metrics.JobPodsFinished.WithLabelValues(completionMode, metrics.Succeeded))
|
||||
if err != nil {
|
||||
t.Fatalf("Obtaining succeeded job_pods_finished_total: %v", err)
|
||||
}
|
||||
newSucceeded := job.Status.Succeeded - tc.job.Status.Succeeded
|
||||
if float64(newSucceeded) != v {
|
||||
t.Errorf("Metric reports %.0f succeeded pods, want %d", v, newSucceeded)
|
||||
}
|
||||
v, err = metricstestutil.GetCounterMetricValue(metrics.JobPodsFinished.WithLabelValues(completionMode, metrics.Failed))
|
||||
if err != nil {
|
||||
t.Fatalf("Obtaining failed job_pods_finished_total: %v", err)
|
||||
}
|
||||
newFailed := job.Status.Failed - tc.job.Status.Failed
|
||||
if float64(newFailed) != v {
|
||||
t.Errorf("Metric reports %.0f failed pods, want %d", v, newFailed)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
@ -68,10 +68,26 @@ var (
|
||||
},
|
||||
[]string{"completion_mode", "result"},
|
||||
)
|
||||
|
||||
// JobPodsFinished records the number of finished Pods that the job controller
|
||||
// finished tracking.
|
||||
// It only applies to Jobs that were created while the feature gate
|
||||
// JobTrackingWithFinalizers was enabled.
|
||||
// Possible label values:
|
||||
// completion_mode: Indexed, NonIndexed
|
||||
// result: failed, succeeded
|
||||
JobPodsFinished = metrics.NewCounterVec(
|
||||
&metrics.CounterOpts{
|
||||
Subsystem: JobControllerSubsystem,
|
||||
Name: "job_pods_finished_total",
|
||||
Help: "The number of finished Pods that are fully tracked",
|
||||
},
|
||||
[]string{"completion_mode", "result"})
|
||||
)
|
||||
|
||||
// Possible values for the "action" label in the above metrics.
|
||||
const (
|
||||
// Possible values for the "action" label in the above metrics.
|
||||
|
||||
// JobSyncActionReconciling when the Job's pod creation/deletion expectations
|
||||
// are unsatisfied and the controller is waiting for issued Pod
|
||||
// creation/deletions to complete.
|
||||
@ -88,6 +104,11 @@ const (
|
||||
// if a Job is suspended or if the number of active Pods is more than
|
||||
// parallelism.
|
||||
JobSyncActionPodsDeleted = "pods_deleted"
|
||||
|
||||
// Possible values for "result" label in the above metrics.
|
||||
|
||||
Succeeded = "succeeded"
|
||||
Failed = "failed"
|
||||
)
|
||||
|
||||
var registerMetrics sync.Once
|
||||
@ -98,5 +119,6 @@ func Register() {
|
||||
legacyregistry.MustRegister(JobSyncDurationSeconds)
|
||||
legacyregistry.MustRegister(JobSyncNum)
|
||||
legacyregistry.MustRegister(JobFinishedNum)
|
||||
legacyregistry.MustRegister(JobPodsFinished)
|
||||
})
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user