Extend metrics with the new labels (#113324)

* Extend job metrics

* Refactor TestMetrics to extract its checks into dedicated tests per feature
This commit is contained in:
Michał Woźniak 2022-10-31 16:50:45 +01:00 committed by GitHub
parent 4d2128b523
commit 3628532311
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 292 additions and 42 deletions

View File

@ -1031,6 +1031,7 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(ctx context.Context, job
if cleanUncountedPodsWithoutFinalizers(&job.Status, uidsWithFinalizer) { if cleanUncountedPodsWithoutFinalizers(&job.Status, uidsWithFinalizer) {
needsFlush = true needsFlush = true
} }
podFailureCountByPolicyAction := map[string]int{}
for _, pod := range pods { for _, pod := range pods {
if !hasJobTrackingFinalizer(pod) || expectedRmFinalizers.Has(string(pod.UID)) { if !hasJobTrackingFinalizer(pod) || expectedRmFinalizers.Has(string(pod.UID)) {
continue continue
@ -1061,7 +1062,10 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(ctx context.Context, job
ix := getCompletionIndex(pod.Annotations) ix := getCompletionIndex(pod.Annotations)
if !uncounted.failed.Has(string(pod.UID)) && (!isIndexed || (ix != unknownCompletionIndex && ix < int(*job.Spec.Completions))) { if !uncounted.failed.Has(string(pod.UID)) && (!isIndexed || (ix != unknownCompletionIndex && ix < int(*job.Spec.Completions))) {
if feature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) && job.Spec.PodFailurePolicy != nil { if feature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) && job.Spec.PodFailurePolicy != nil {
_, countFailed := matchPodFailurePolicy(job.Spec.PodFailurePolicy, pod) _, countFailed, action := matchPodFailurePolicy(job.Spec.PodFailurePolicy, pod)
if action != nil {
podFailureCountByPolicyAction[string(*action)] += 1
}
if countFailed { if countFailed {
needsFlush = true needsFlush = true
uncountedStatus.Failed = append(uncountedStatus.Failed, pod.UID) uncountedStatus.Failed = append(uncountedStatus.Failed, pod.UID)
@ -1102,7 +1106,7 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(ctx context.Context, job
} }
} }
var err error var err error
if job, needsFlush, err = jm.flushUncountedAndRemoveFinalizers(ctx, job, podsToRemoveFinalizer, uidsWithFinalizer, &oldCounters, needsFlush); err != nil { if job, needsFlush, err = jm.flushUncountedAndRemoveFinalizers(ctx, job, podsToRemoveFinalizer, uidsWithFinalizer, &oldCounters, podFailureCountByPolicyAction, needsFlush); err != nil {
return err return err
} }
jobFinished := jm.enactJobFinished(job, finishedCond) jobFinished := jm.enactJobFinished(job, finishedCond)
@ -1132,7 +1136,7 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(ctx context.Context, job
// //
// Returns whether there are pending changes in the Job status that need to be // Returns whether there are pending changes in the Job status that need to be
// flushed in subsequent calls. // flushed in subsequent calls.
func (jm *Controller) flushUncountedAndRemoveFinalizers(ctx context.Context, job *batch.Job, podsToRemoveFinalizer []*v1.Pod, uidsWithFinalizer sets.String, oldCounters *batch.JobStatus, needsFlush bool) (*batch.Job, bool, error) { func (jm *Controller) flushUncountedAndRemoveFinalizers(ctx context.Context, job *batch.Job, podsToRemoveFinalizer []*v1.Pod, uidsWithFinalizer sets.String, oldCounters *batch.JobStatus, podFailureCountByPolicyAction map[string]int, needsFlush bool) (*batch.Job, bool, error) {
var err error var err error
if needsFlush { if needsFlush {
if job, err = jm.updateStatusHandler(ctx, job); err != nil { if job, err = jm.updateStatusHandler(ctx, job); err != nil {
@ -1143,6 +1147,8 @@ func (jm *Controller) flushUncountedAndRemoveFinalizers(ctx context.Context, job
*oldCounters = job.Status *oldCounters = job.Status
needsFlush = false needsFlush = false
} }
recordJobPodFailurePolicyActions(job, podFailureCountByPolicyAction)
jobKey, err := controller.KeyFunc(job) jobKey, err := controller.KeyFunc(job)
if err != nil { if err != nil {
return job, needsFlush, fmt.Errorf("getting job key: %w", err) return job, needsFlush, fmt.Errorf("getting job key: %w", err)
@ -1263,10 +1269,10 @@ func (jm *Controller) recordJobFinished(job *batch.Job, finishedCond *batch.JobC
jm.recorder.Event(job, v1.EventTypeWarning, "TooManySucceededPods", "Too many succeeded pods running after completion count reached") jm.recorder.Event(job, v1.EventTypeWarning, "TooManySucceededPods", "Too many succeeded pods running after completion count reached")
} }
jm.recorder.Event(job, v1.EventTypeNormal, "Completed", "Job completed") jm.recorder.Event(job, v1.EventTypeNormal, "Completed", "Job completed")
metrics.JobFinishedNum.WithLabelValues(completionMode, "succeeded").Inc() metrics.JobFinishedNum.WithLabelValues(completionMode, "succeeded", "").Inc()
} else { } else {
jm.recorder.Event(job, v1.EventTypeWarning, finishedCond.Reason, finishedCond.Message) jm.recorder.Event(job, v1.EventTypeWarning, finishedCond.Reason, finishedCond.Message)
metrics.JobFinishedNum.WithLabelValues(completionMode, "failed").Inc() metrics.JobFinishedNum.WithLabelValues(completionMode, "failed", finishedCond.Reason).Inc()
} }
return true return true
} }
@ -1345,7 +1351,7 @@ func getFailJobMessage(job *batch.Job, pods []*v1.Pod, uncounted sets.String) *s
} }
for _, p := range pods { for _, p := range pods {
if isPodFailed(p, uncounted != nil) { if isPodFailed(p, uncounted != nil) {
jobFailureMessage, _ := matchPodFailurePolicy(job.Spec.PodFailurePolicy, p) jobFailureMessage, _, _ := matchPodFailurePolicy(job.Spec.PodFailurePolicy, p)
if jobFailureMessage != nil { if jobFailureMessage != nil {
return jobFailureMessage return jobFailureMessage
} }
@ -1369,7 +1375,7 @@ func getStatus(job *batch.Job, pods []*v1.Pod, uncounted *uncountedTerminatedPod
if !isPodFailed(p, uncounted != nil) { if !isPodFailed(p, uncounted != nil) {
return false return false
} }
_, countFailed := matchPodFailurePolicy(job.Spec.PodFailurePolicy, p) _, countFailed, _ := matchPodFailurePolicy(job.Spec.PodFailurePolicy, p)
return countFailed return countFailed
} else { } else {
return isPodFailed(p, uncounted != nil) return isPodFailed(p, uncounted != nil)
@ -1768,6 +1774,12 @@ func recordJobPodFinished(job *batch.Job, oldCounters batch.JobStatus) {
metrics.JobPodsFinished.WithLabelValues(completionMode, metrics.Failed).Add(float64(diff)) metrics.JobPodsFinished.WithLabelValues(completionMode, metrics.Failed).Add(float64(diff))
} }
func recordJobPodFailurePolicyActions(job *batch.Job, podFailureCountByPolicyAction map[string]int) {
for action, count := range podFailureCountByPolicyAction {
metrics.PodFailuresHandledByFailurePolicy.WithLabelValues(action).Add(float64(count))
}
}
func countReadyPods(pods []*v1.Pod) int32 { func countReadyPods(pods []*v1.Pod) int32 {
cnt := int32(0) cnt := int32(0)
for _, p := range pods { for _, p := range pods {

View File

@ -55,10 +55,12 @@ var (
}, },
[]string{"completion_mode", "result", "action"}, []string{"completion_mode", "result", "action"},
) )
// JobFinishedNum tracks the number of Jobs that finish. Possible label // JobFinishedNum tracks the number of Jobs that finish. Empty reason label
// values: // is used to count successful jobs.
// Possible label values:
// completion_mode: Indexed, NonIndexed // completion_mode: Indexed, NonIndexed
// result: failed, succeeded // result: failed, succeeded
// reason: "BackoffLimitExceeded", "DeadlineExceeded", "PodFailurePolicy", ""
JobFinishedNum = metrics.NewCounterVec( JobFinishedNum = metrics.NewCounterVec(
&metrics.CounterOpts{ &metrics.CounterOpts{
Subsystem: JobControllerSubsystem, Subsystem: JobControllerSubsystem,
@ -66,7 +68,7 @@ var (
Help: "The number of finished job", Help: "The number of finished job",
StabilityLevel: metrics.ALPHA, StabilityLevel: metrics.ALPHA,
}, },
[]string{"completion_mode", "result"}, []string{"completion_mode", "result", "reason"},
) )
// JobPodsFinished records the number of finished Pods that the job controller // JobPodsFinished records the number of finished Pods that the job controller
@ -84,6 +86,22 @@ var (
}, },
[]string{"completion_mode", "result"}) []string{"completion_mode", "result"})
// PodFailuresHandledByFailurePolicy records the number of finished Pods
// handled by pod failure policy.
// Possible label values:
// action: FailJob, Ignore, Count
PodFailuresHandledByFailurePolicy = metrics.NewCounterVec(
&metrics.CounterOpts{
Subsystem: JobControllerSubsystem,
Name: "pod_failures_handled_by_failure_policy_total",
Help: `The number of failed Pods handled by failure policy with
respect to the failure policy action applied based on the matched
rule. Possible values of the action label correspond to the
possible values for the failure policy rule action, which are:
"FailJob", "Ignore" and "Count".`,
},
[]string{"action"})
// TerminatedPodsWithTrackingFinalizer records the addition and removal of // TerminatedPodsWithTrackingFinalizer records the addition and removal of
// terminated pods that have the finalizer batch.kubernetes.io/job-tracking, // terminated pods that have the finalizer batch.kubernetes.io/job-tracking,
// regardless of whether they are owned by a Job. // regardless of whether they are owned by a Job.
@ -137,6 +155,7 @@ func Register() {
legacyregistry.MustRegister(JobSyncNum) legacyregistry.MustRegister(JobSyncNum)
legacyregistry.MustRegister(JobFinishedNum) legacyregistry.MustRegister(JobFinishedNum)
legacyregistry.MustRegister(JobPodsFinished) legacyregistry.MustRegister(JobPodsFinished)
legacyregistry.MustRegister(PodFailuresHandledByFailurePolicy)
legacyregistry.MustRegister(TerminatedPodsTrackingFinalizerTotal) legacyregistry.MustRegister(TerminatedPodsTrackingFinalizerTotal)
}) })
} }

View File

@ -26,42 +26,46 @@ import (
// matchPodFailurePolicy returns information about matching a given failed pod // matchPodFailurePolicy returns information about matching a given failed pod
// against the pod failure policy rules. The information is represented as an // against the pod failure policy rules. The information is represented as an
// optional job failure message (present in case the pod matched a 'FailJob' // optional job failure message (present in case the pod matched a 'FailJob'
// rule) and a boolean indicating if the failure should be counted towards // rule), a boolean indicating if the failure should be counted towards
// backoffLimit (it should not be counted if the pod matched an 'Ignore' rule). // backoffLimit (it should not be counted if the pod matched an 'Ignore' rule),
func matchPodFailurePolicy(podFailurePolicy *batch.PodFailurePolicy, failedPod *v1.Pod) (*string, bool) { // and a pointer to the matched pod failure policy action.
func matchPodFailurePolicy(podFailurePolicy *batch.PodFailurePolicy, failedPod *v1.Pod) (*string, bool, *batch.PodFailurePolicyAction) {
if podFailurePolicy == nil { if podFailurePolicy == nil {
return nil, true return nil, true, nil
} }
ignore := batch.PodFailurePolicyActionIgnore
failJob := batch.PodFailurePolicyActionFailJob
count := batch.PodFailurePolicyActionCount
for index, podFailurePolicyRule := range podFailurePolicy.Rules { for index, podFailurePolicyRule := range podFailurePolicy.Rules {
if podFailurePolicyRule.OnExitCodes != nil { if podFailurePolicyRule.OnExitCodes != nil {
if containerStatus := matchOnExitCodes(&failedPod.Status, podFailurePolicyRule.OnExitCodes); containerStatus != nil { if containerStatus := matchOnExitCodes(&failedPod.Status, podFailurePolicyRule.OnExitCodes); containerStatus != nil {
switch podFailurePolicyRule.Action { switch podFailurePolicyRule.Action {
case batch.PodFailurePolicyActionIgnore: case batch.PodFailurePolicyActionIgnore:
return nil, false return nil, false, &ignore
case batch.PodFailurePolicyActionCount: case batch.PodFailurePolicyActionCount:
return nil, true return nil, true, &count
case batch.PodFailurePolicyActionFailJob: case batch.PodFailurePolicyActionFailJob:
msg := fmt.Sprintf("Container %s for pod %s/%s failed with exit code %v matching %v rule at index %d", msg := fmt.Sprintf("Container %s for pod %s/%s failed with exit code %v matching %v rule at index %d",
containerStatus.Name, failedPod.Namespace, failedPod.Name, containerStatus.State.Terminated.ExitCode, podFailurePolicyRule.Action, index) containerStatus.Name, failedPod.Namespace, failedPod.Name, containerStatus.State.Terminated.ExitCode, podFailurePolicyRule.Action, index)
return &msg, true return &msg, true, &failJob
} }
} }
} else if podFailurePolicyRule.OnPodConditions != nil { } else if podFailurePolicyRule.OnPodConditions != nil {
if podCondition := matchOnPodConditions(&failedPod.Status, podFailurePolicyRule.OnPodConditions); podCondition != nil { if podCondition := matchOnPodConditions(&failedPod.Status, podFailurePolicyRule.OnPodConditions); podCondition != nil {
switch podFailurePolicyRule.Action { switch podFailurePolicyRule.Action {
case batch.PodFailurePolicyActionIgnore: case batch.PodFailurePolicyActionIgnore:
return nil, false return nil, false, &ignore
case batch.PodFailurePolicyActionCount: case batch.PodFailurePolicyActionCount:
return nil, true return nil, true, &count
case batch.PodFailurePolicyActionFailJob: case batch.PodFailurePolicyActionFailJob:
msg := fmt.Sprintf("Pod %s/%s has condition %v matching %v rule at index %d", msg := fmt.Sprintf("Pod %s/%s has condition %v matching %v rule at index %d",
failedPod.Namespace, failedPod.Name, podCondition.Type, podFailurePolicyRule.Action, index) failedPod.Namespace, failedPod.Name, podCondition.Type, podFailurePolicyRule.Action, index)
return &msg, true return &msg, true, &failJob
} }
} }
} }
} }
return nil, true return nil, true, nil
} }
func matchOnExitCodes(podStatus *v1.PodStatus, requirement *batch.PodFailurePolicyOnExitCodesRequirement) *v1.ContainerStatus { func matchOnExitCodes(podStatus *v1.PodStatus, requirement *batch.PodFailurePolicyOnExitCodesRequirement) *v1.ContainerStatus {

View File

@ -19,6 +19,7 @@ package job
import ( import (
"testing" "testing"
"github.com/google/go-cmp/cmp"
batch "k8s.io/api/batch/v1" batch "k8s.io/api/batch/v1"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -31,12 +32,16 @@ func TestMatchPodFailurePolicy(t *testing.T) {
Namespace: "default", Namespace: "default",
Name: "mypod", Name: "mypod",
} }
ignore := batch.PodFailurePolicyActionIgnore
failJob := batch.PodFailurePolicyActionFailJob
count := batch.PodFailurePolicyActionCount
testCases := map[string]struct { testCases := map[string]struct {
podFailurePolicy *batch.PodFailurePolicy podFailurePolicy *batch.PodFailurePolicy
failedPod *v1.Pod failedPod *v1.Pod
wantJobFailureMessage *string wantJobFailureMessage *string
wantCountFailed bool wantCountFailed bool
wantAction *batch.PodFailurePolicyAction
}{ }{
"unknown action for rule matching by exit codes - skip rule with unknown action": { "unknown action for rule matching by exit codes - skip rule with unknown action": {
podFailurePolicy: &batch.PodFailurePolicy{ podFailurePolicy: &batch.PodFailurePolicy{
@ -75,6 +80,7 @@ func TestMatchPodFailurePolicy(t *testing.T) {
}, },
wantJobFailureMessage: pointer.String("Container main-container for pod default/mypod failed with exit code 2 matching FailJob rule at index 1"), wantJobFailureMessage: pointer.String("Container main-container for pod default/mypod failed with exit code 2 matching FailJob rule at index 1"),
wantCountFailed: true, wantCountFailed: true,
wantAction: &failJob,
}, },
"unknown action for rule matching by pod conditions - skip rule with unknown action": { "unknown action for rule matching by pod conditions - skip rule with unknown action": {
podFailurePolicy: &batch.PodFailurePolicy{ podFailurePolicy: &batch.PodFailurePolicy{
@ -113,6 +119,7 @@ func TestMatchPodFailurePolicy(t *testing.T) {
}, },
wantJobFailureMessage: nil, wantJobFailureMessage: nil,
wantCountFailed: false, wantCountFailed: false,
wantAction: &ignore,
}, },
"unknown operator - rule with unknown action is skipped for onExitCodes": { "unknown operator - rule with unknown action is skipped for onExitCodes": {
podFailurePolicy: &batch.PodFailurePolicy{ podFailurePolicy: &batch.PodFailurePolicy{
@ -151,6 +158,7 @@ func TestMatchPodFailurePolicy(t *testing.T) {
}, },
wantJobFailureMessage: pointer.String("Container main-container for pod default/mypod failed with exit code 2 matching FailJob rule at index 1"), wantJobFailureMessage: pointer.String("Container main-container for pod default/mypod failed with exit code 2 matching FailJob rule at index 1"),
wantCountFailed: true, wantCountFailed: true,
wantAction: &failJob,
}, },
"no policy rules": { "no policy rules": {
podFailurePolicy: nil, podFailurePolicy: nil,
@ -201,6 +209,7 @@ func TestMatchPodFailurePolicy(t *testing.T) {
}, },
wantJobFailureMessage: nil, wantJobFailureMessage: nil,
wantCountFailed: false, wantCountFailed: false,
wantAction: &ignore,
}, },
"FailJob rule matched for exit codes": { "FailJob rule matched for exit codes": {
podFailurePolicy: &batch.PodFailurePolicy{ podFailurePolicy: &batch.PodFailurePolicy{
@ -232,6 +241,7 @@ func TestMatchPodFailurePolicy(t *testing.T) {
}, },
wantJobFailureMessage: pointer.String("Container main-container for pod default/mypod failed with exit code 2 matching FailJob rule at index 0"), wantJobFailureMessage: pointer.String("Container main-container for pod default/mypod failed with exit code 2 matching FailJob rule at index 0"),
wantCountFailed: true, wantCountFailed: true,
wantAction: &failJob,
}, },
"successful containers are skipped by the rules": { "successful containers are skipped by the rules": {
podFailurePolicy: &batch.PodFailurePolicy{ podFailurePolicy: &batch.PodFailurePolicy{
@ -320,6 +330,7 @@ func TestMatchPodFailurePolicy(t *testing.T) {
}, },
wantJobFailureMessage: pointer.String("Container main-container for pod default/mypod failed with exit code 1 matching FailJob rule at index 0"), wantJobFailureMessage: pointer.String("Container main-container for pod default/mypod failed with exit code 1 matching FailJob rule at index 0"),
wantCountFailed: true, wantCountFailed: true,
wantAction: &failJob,
}, },
"second jobfail rule matched for exit codes": { "second jobfail rule matched for exit codes": {
podFailurePolicy: &batch.PodFailurePolicy{ podFailurePolicy: &batch.PodFailurePolicy{
@ -358,6 +369,7 @@ func TestMatchPodFailurePolicy(t *testing.T) {
}, },
wantJobFailureMessage: pointer.String("Container main-container for pod default/mypod failed with exit code 6 matching FailJob rule at index 1"), wantJobFailureMessage: pointer.String("Container main-container for pod default/mypod failed with exit code 6 matching FailJob rule at index 1"),
wantCountFailed: true, wantCountFailed: true,
wantAction: &failJob,
}, },
"count rule matched for exit codes": { "count rule matched for exit codes": {
podFailurePolicy: &batch.PodFailurePolicy{ podFailurePolicy: &batch.PodFailurePolicy{
@ -389,6 +401,7 @@ func TestMatchPodFailurePolicy(t *testing.T) {
}, },
wantJobFailureMessage: nil, wantJobFailureMessage: nil,
wantCountFailed: true, wantCountFailed: true,
wantAction: &count,
}, },
"ignore rule matched for pod conditions": { "ignore rule matched for pod conditions": {
podFailurePolicy: &batch.PodFailurePolicy{ podFailurePolicy: &batch.PodFailurePolicy{
@ -418,6 +431,7 @@ func TestMatchPodFailurePolicy(t *testing.T) {
}, },
wantJobFailureMessage: nil, wantJobFailureMessage: nil,
wantCountFailed: false, wantCountFailed: false,
wantAction: &ignore,
}, },
"ignore rule matches by the status=False": { "ignore rule matches by the status=False": {
podFailurePolicy: &batch.PodFailurePolicy{ podFailurePolicy: &batch.PodFailurePolicy{
@ -447,6 +461,7 @@ func TestMatchPodFailurePolicy(t *testing.T) {
}, },
wantJobFailureMessage: nil, wantJobFailureMessage: nil,
wantCountFailed: false, wantCountFailed: false,
wantAction: &ignore,
}, },
"ignore rule matches by the status=Unknown": { "ignore rule matches by the status=Unknown": {
podFailurePolicy: &batch.PodFailurePolicy{ podFailurePolicy: &batch.PodFailurePolicy{
@ -476,6 +491,7 @@ func TestMatchPodFailurePolicy(t *testing.T) {
}, },
wantJobFailureMessage: nil, wantJobFailureMessage: nil,
wantCountFailed: false, wantCountFailed: false,
wantAction: &ignore,
}, },
"ignore rule does not match when status for pattern is False, but actual True": { "ignore rule does not match when status for pattern is False, but actual True": {
podFailurePolicy: &batch.PodFailurePolicy{ podFailurePolicy: &batch.PodFailurePolicy{
@ -592,6 +608,7 @@ func TestMatchPodFailurePolicy(t *testing.T) {
}, },
wantJobFailureMessage: pointer.String("Pod default/mypod has condition DisruptionTarget matching FailJob rule at index 0"), wantJobFailureMessage: pointer.String("Pod default/mypod has condition DisruptionTarget matching FailJob rule at index 0"),
wantCountFailed: true, wantCountFailed: true,
wantAction: &failJob,
}, },
"count rule matched for pod conditions": { "count rule matched for pod conditions": {
podFailurePolicy: &batch.PodFailurePolicy{ podFailurePolicy: &batch.PodFailurePolicy{
@ -621,6 +638,7 @@ func TestMatchPodFailurePolicy(t *testing.T) {
}, },
wantJobFailureMessage: nil, wantJobFailureMessage: nil,
wantCountFailed: true, wantCountFailed: true,
wantAction: &count,
}, },
"no rule matched": { "no rule matched": {
podFailurePolicy: &batch.PodFailurePolicy{ podFailurePolicy: &batch.PodFailurePolicy{
@ -683,25 +701,21 @@ func TestMatchPodFailurePolicy(t *testing.T) {
}, },
wantJobFailureMessage: nil, wantJobFailureMessage: nil,
wantCountFailed: true, wantCountFailed: true,
wantAction: &count,
}, },
} }
for name, tc := range testCases { for name, tc := range testCases {
t.Run(name, func(t *testing.T) { t.Run(name, func(t *testing.T) {
jobFailMessage, countFailed := matchPodFailurePolicy(tc.podFailurePolicy, tc.failedPod) jobFailMessage, countFailed, action := matchPodFailurePolicy(tc.podFailurePolicy, tc.failedPod)
if tc.wantJobFailureMessage == nil { if diff := cmp.Diff(tc.wantJobFailureMessage, jobFailMessage); diff != "" {
if jobFailMessage != nil { t.Errorf("Unexpected job failure message: %s", diff)
t.Errorf("Unexpected job fail message. Got: %q", *jobFailMessage)
}
} else {
if jobFailMessage == nil {
t.Errorf("Missing job fail message. want: %q", *tc.wantJobFailureMessage)
} else if *tc.wantJobFailureMessage != *jobFailMessage {
t.Errorf("Unexpected job fail message. want: %q. got: %q", *tc.wantJobFailureMessage, *jobFailMessage)
}
} }
if tc.wantCountFailed != countFailed { if tc.wantCountFailed != countFailed {
t.Errorf("Unexpected count failed. want: %v. got: %v", tc.wantCountFailed, countFailed) t.Errorf("Unexpected count failed. want: %v. got: %v", tc.wantCountFailed, countFailed)
} }
if diff := cmp.Diff(tc.wantAction, action); diff != "" {
t.Errorf("Unexpected failure policy action: %s", diff)
}
}) })
} }
} }

View File

@ -68,7 +68,7 @@ type metricLabelsWithValue struct {
Value int Value int
} }
func TestMetrics(t *testing.T) { func TestMetricsOnSuccesses(t *testing.T) {
nonIndexedCompletion := batchv1.NonIndexedCompletion nonIndexedCompletion := batchv1.NonIndexedCompletion
indexedCompletion := batchv1.IndexedCompletion indexedCompletion := batchv1.IndexedCompletion
wFinalizers := true wFinalizers := true
@ -94,7 +94,7 @@ func TestMetrics(t *testing.T) {
}, },
}, },
wantJobFinishedNumMetric: metricLabelsWithValue{ wantJobFinishedNumMetric: metricLabelsWithValue{
Labels: []string{"NonIndexed", "succeeded"}, Labels: []string{"NonIndexed", "succeeded", ""},
Value: 1, Value: 1,
}, },
wantJobPodsFinishedMetric: metricLabelsWithValue{ wantJobPodsFinishedMetric: metricLabelsWithValue{
@ -111,7 +111,7 @@ func TestMetrics(t *testing.T) {
}, },
}, },
wantJobFinishedNumMetric: metricLabelsWithValue{ wantJobFinishedNumMetric: metricLabelsWithValue{
Labels: []string{"Indexed", "succeeded"}, Labels: []string{"Indexed", "succeeded", ""},
Value: 1, Value: 1,
}, },
wantJobPodsFinishedMetric: metricLabelsWithValue{ wantJobPodsFinishedMetric: metricLabelsWithValue{
@ -149,6 +149,146 @@ func TestMetrics(t *testing.T) {
} }
} }
func TestJobFinishedNumReasonMetric(t *testing.T) {
wFinalizers := true
// setup the job controller
closeFn, restConfig, clientSet, ns := setup(t, "simple")
defer closeFn()
ctx, cancel := startJobControllerAndWaitForCaches(restConfig)
defer cancel()
testCases := map[string]struct {
job batchv1.Job
podStatus v1.PodStatus
enableJobPodFailurePolicy bool
wantJobFinishedNumMetric metricLabelsWithValue
}{
"non-indexed job; failed pod handled by FailJob action; JobPodFailurePolicy enabled": {
enableJobPodFailurePolicy: true,
job: batchv1.Job{
Spec: batchv1.JobSpec{
Completions: pointer.Int32(1),
Parallelism: pointer.Int32(1),
BackoffLimit: pointer.Int32(1),
PodFailurePolicy: &batchv1.PodFailurePolicy{
Rules: []batchv1.PodFailurePolicyRule{
{
Action: batchv1.PodFailurePolicyActionFailJob,
OnExitCodes: &batchv1.PodFailurePolicyOnExitCodesRequirement{
Operator: batchv1.PodFailurePolicyOnExitCodesOpIn,
Values: []int32{5},
},
},
},
},
},
},
podStatus: v1.PodStatus{
Phase: v1.PodFailed,
ContainerStatuses: []v1.ContainerStatus{
{
State: v1.ContainerState{
Terminated: &v1.ContainerStateTerminated{
ExitCode: 5,
},
},
},
},
},
wantJobFinishedNumMetric: metricLabelsWithValue{
Labels: []string{"NonIndexed", "failed", "PodFailurePolicy"},
Value: 1,
},
},
"non-indexed job; failed pod handled by Count action; JobPodFailurePolicy enabled": {
enableJobPodFailurePolicy: true,
job: batchv1.Job{
Spec: batchv1.JobSpec{
Completions: pointer.Int32(1),
Parallelism: pointer.Int32(1),
BackoffLimit: pointer.Int32(0),
PodFailurePolicy: &batchv1.PodFailurePolicy{
Rules: []batchv1.PodFailurePolicyRule{
{
Action: batchv1.PodFailurePolicyActionCount,
OnExitCodes: &batchv1.PodFailurePolicyOnExitCodesRequirement{
Operator: batchv1.PodFailurePolicyOnExitCodesOpIn,
Values: []int32{5},
},
},
},
},
},
},
podStatus: v1.PodStatus{
Phase: v1.PodFailed,
ContainerStatuses: []v1.ContainerStatus{
{
State: v1.ContainerState{
Terminated: &v1.ContainerStateTerminated{
ExitCode: 5,
},
},
},
},
},
wantJobFinishedNumMetric: metricLabelsWithValue{
Labels: []string{"NonIndexed", "failed", "BackoffLimitExceeded"},
Value: 1,
},
},
"non-indexed job; failed": {
job: batchv1.Job{
Spec: batchv1.JobSpec{
Completions: pointer.Int32(1),
Parallelism: pointer.Int32(1),
BackoffLimit: pointer.Int32(0),
},
},
podStatus: v1.PodStatus{
Phase: v1.PodFailed,
},
wantJobFinishedNumMetric: metricLabelsWithValue{
Labels: []string{"NonIndexed", "failed", "BackoffLimitExceeded"},
Value: 1,
},
},
}
job_index := 0 // job index to avoid collisions between job names created by different test cases
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, wFinalizers)()
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobPodFailurePolicy, tc.enableJobPodFailurePolicy)()
resetMetrics()
// create a single job and wait for its completion
job := tc.job.DeepCopy()
job.Name = fmt.Sprintf("job-%v", job_index)
job_index++
jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, job)
if err != nil {
t.Fatalf("Failed to create Job: %v", err)
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
Active: int(*jobObj.Spec.Parallelism),
Ready: pointer.Int32(0),
}, wFinalizers)
op := func(p *v1.Pod) bool {
p.Status = tc.podStatus
return true
}
if err, _ := updateJobPodsStatus(ctx, clientSet, jobObj, op, 1); err != nil {
t.Fatalf("Error %q while updating pod status for Job: %q", err, jobObj.Name)
}
validateJobFailed(ctx, t, clientSet, jobObj)
// verify metric values after the job is finished
validateCounterMetric(t, metrics.JobFinishedNum, tc.wantJobFinishedNumMetric)
})
}
}
func validateCounterMetric(t *testing.T, counterVec *basemetrics.CounterVec, wantMetric metricLabelsWithValue) { func validateCounterMetric(t *testing.T, counterVec *basemetrics.CounterVec, wantMetric metricLabelsWithValue) {
t.Helper() t.Helper()
var cmpErr error var cmpErr error
@ -351,6 +491,13 @@ func TestJobPodFailurePolicy(t *testing.T) {
}, },
}, },
}, },
{
Action: batchv1.PodFailurePolicyActionCount,
OnExitCodes: &batchv1.PodFailurePolicyOnExitCodesRequirement{
Operator: batchv1.PodFailurePolicyOnExitCodesOpIn,
Values: []int32{10},
},
},
{ {
Action: batchv1.PodFailurePolicyActionFailJob, Action: batchv1.PodFailurePolicyActionFailJob,
OnExitCodes: &batchv1.PodFailurePolicyOnExitCodesRequirement{ OnExitCodes: &batchv1.PodFailurePolicyOnExitCodesRequirement{
@ -375,6 +522,19 @@ func TestJobPodFailurePolicy(t *testing.T) {
}, },
}, },
} }
podStatusMatchingOnExitCodesCountRule := v1.PodStatus{
Phase: v1.PodFailed,
ContainerStatuses: []v1.ContainerStatus{
{
Name: "main-container",
State: v1.ContainerState{
Terminated: &v1.ContainerStateTerminated{
ExitCode: 10,
},
},
},
},
}
podStatusMatchingOnPodConditionsIgnoreRule := v1.PodStatus{ podStatusMatchingOnPodConditionsIgnoreRule := v1.PodStatus{
Phase: v1.PodFailed, Phase: v1.PodFailed,
Conditions: []v1.PodCondition{ Conditions: []v1.PodCondition{
@ -384,14 +544,18 @@ func TestJobPodFailurePolicy(t *testing.T) {
}, },
}, },
} }
podStatusNotMatchingAnyRule := v1.PodStatus{
Phase: v1.PodFailed,
}
testCases := map[string]struct { testCases := map[string]struct {
enableJobPodFailurePolicy bool enableJobPodFailurePolicy bool
restartController bool restartController bool
job batchv1.Job job batchv1.Job
podStatus v1.PodStatus podStatus v1.PodStatus
wantActive int wantActive int
wantFailed int wantFailed int
wantJobConditionType batchv1.JobConditionType wantJobConditionType batchv1.JobConditionType
wantPodFailuresHandledByPolicyRuleMetric *metricLabelsWithValue
}{ }{
"pod status matching the configured FailJob rule on exit codes; job terminated when JobPodFailurePolicy enabled": { "pod status matching the configured FailJob rule on exit codes; job terminated when JobPodFailurePolicy enabled": {
enableJobPodFailurePolicy: true, enableJobPodFailurePolicy: true,
@ -400,6 +564,10 @@ func TestJobPodFailurePolicy(t *testing.T) {
wantActive: 0, wantActive: 0,
wantFailed: 1, wantFailed: 1,
wantJobConditionType: batchv1.JobFailed, wantJobConditionType: batchv1.JobFailed,
wantPodFailuresHandledByPolicyRuleMetric: &metricLabelsWithValue{
Labels: []string{"FailJob"},
Value: 1,
},
}, },
"pod status matching the configured FailJob rule on exit codes; with controller restart; job terminated when JobPodFailurePolicy enabled": { "pod status matching the configured FailJob rule on exit codes; with controller restart; job terminated when JobPodFailurePolicy enabled": {
enableJobPodFailurePolicy: true, enableJobPodFailurePolicy: true,
@ -425,11 +593,40 @@ func TestJobPodFailurePolicy(t *testing.T) {
wantActive: 1, wantActive: 1,
wantFailed: 0, wantFailed: 0,
wantJobConditionType: batchv1.JobComplete, wantJobConditionType: batchv1.JobComplete,
wantPodFailuresHandledByPolicyRuleMetric: &metricLabelsWithValue{
Labels: []string{"Ignore"},
Value: 1,
},
},
"pod status matching the configured Count rule on exit codes; pod failure counted when JobPodFailurePolicy enabled": {
enableJobPodFailurePolicy: true,
job: job,
podStatus: podStatusMatchingOnExitCodesCountRule,
wantActive: 1,
wantFailed: 1,
wantJobConditionType: batchv1.JobComplete,
wantPodFailuresHandledByPolicyRuleMetric: &metricLabelsWithValue{
Labels: []string{"Count"},
Value: 1,
},
},
"pod status non-matching any configured rule; pod failure counted when JobPodFailurePolicy enabled": {
enableJobPodFailurePolicy: true,
job: job,
podStatus: podStatusNotMatchingAnyRule,
wantActive: 1,
wantFailed: 1,
wantJobConditionType: batchv1.JobComplete,
wantPodFailuresHandledByPolicyRuleMetric: &metricLabelsWithValue{
Labels: []string{"Count"},
Value: 0,
},
}, },
} }
for name, test := range testCases { for name, test := range testCases {
for _, wFinalizers := range []bool{false, true} { for _, wFinalizers := range []bool{false, true} {
t.Run(fmt.Sprintf("%s; finalizers=%t", name, wFinalizers), func(t *testing.T) { t.Run(fmt.Sprintf("%s; finalizers=%t", name, wFinalizers), func(t *testing.T) {
resetMetrics()
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, wFinalizers)() defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, wFinalizers)()
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobPodFailurePolicy, test.enableJobPodFailurePolicy)() defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobPodFailurePolicy, test.enableJobPodFailurePolicy)()
@ -475,6 +672,9 @@ func TestJobPodFailurePolicy(t *testing.T) {
} }
} }
validateJobCondition(ctx, t, clientSet, jobObj, test.wantJobConditionType) validateJobCondition(ctx, t, clientSet, jobObj, test.wantJobConditionType)
if wFinalizers && test.wantPodFailuresHandledByPolicyRuleMetric != nil {
validateCounterMetric(t, metrics.PodFailuresHandledByFailurePolicy, *test.wantPodFailuresHandledByPolicyRuleMetric)
}
validateFinishedPodsNoFinalizer(ctx, t, clientSet, jobObj) validateFinishedPodsNoFinalizer(ctx, t, clientSet, jobObj)
}) })
} }
@ -1699,6 +1899,7 @@ func resetMetrics() {
metrics.TerminatedPodsTrackingFinalizerTotal.Reset() metrics.TerminatedPodsTrackingFinalizerTotal.Reset()
metrics.JobFinishedNum.Reset() metrics.JobFinishedNum.Reset()
metrics.JobPodsFinished.Reset() metrics.JobPodsFinished.Reset()
metrics.PodFailuresHandledByFailurePolicy.Reset()
} }
func createJobControllerWithSharedInformers(restConfig *restclient.Config, informerSet informers.SharedInformerFactory) (*jobcontroller.Controller, context.Context, context.CancelFunc) { func createJobControllerWithSharedInformers(restConfig *restclient.Config, informerSet informers.SharedInformerFactory) (*jobcontroller.Controller, context.Context, context.CancelFunc) {