count terminating pods when deleting active pods for failed jobs

This commit is contained in:
Dejan Pejchev 2024-05-29 00:10:27 +02:00
parent bc8ec4f9aa
commit 7dd2948620
No known key found for this signature in database
GPG Key ID: 8A900F09C964845E
3 changed files with 55 additions and 14 deletions

View File

@ -899,6 +899,9 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) {
jobCtx.finishedCondition = nil
}
active -= deleted
if feature.DefaultFeatureGate.Enabled(features.JobPodReplacementPolicy) {
*jobCtx.terminating += deleted
}
manageJobErr = err
} else {
manageJobCalled := false
@ -1504,6 +1507,9 @@ func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, jobCtx *syn
jm.expectations.ExpectDeletions(logger, jobKey, len(podsToDelete))
removed, err := jm.deleteJobPods(ctx, job, jobKey, podsToDelete)
active -= removed
if feature.DefaultFeatureGate.Enabled(features.JobPodReplacementPolicy) {
*jobCtx.terminating += removed
}
return active, metrics.JobSyncActionPodsDeleted, err
}
@ -1553,6 +1559,9 @@ func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, jobCtx *syn
logger.V(4).Info("Too many pods running for job", "job", klog.KObj(job), "deleted", len(podsToDelete), "target", wantActive)
removed, err := jm.deleteJobPods(ctx, job, jobKey, podsToDelete)
active -= removed
if feature.DefaultFeatureGate.Enabled(features.JobPodReplacementPolicy) {
*jobCtx.terminating += removed
}
// While it is possible for a Job to require both pod creations and
// deletions at the same time (e.g. indexed Jobs with repeated indexes), we
// restrict ourselves to either just pod deletion or pod creation in any

View File

@ -400,10 +400,10 @@ func TestControllerSyncJob(t *testing.T) {
backoffLimit: 6,
activePods: 2,
failedPods: 0,
terminatingPods: 4,
terminatingPods: 5,
podReplacementPolicy: podReplacementPolicy(batch.Failed),
jobPodReplacementPolicy: true,
expectedTerminating: ptr.To[int32](4),
expectedTerminating: ptr.To[int32](6),
expectedReady: ptr.To[int32](0),
expectedActive: 1,
expectedDeletions: 1,
@ -3640,7 +3640,7 @@ func TestSyncJobWithJobSuccessPolicy(t *testing.T) {
wantStatus: batch.JobStatus{
Failed: 1,
Succeeded: 1,
Terminating: ptr.To[int32](0),
Terminating: ptr.To[int32](2),
CompletedIndexes: "1",
UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
Conditions: []batch.JobCondition{
@ -3697,7 +3697,7 @@ func TestSyncJobWithJobSuccessPolicy(t *testing.T) {
wantStatus: batch.JobStatus{
Failed: 1,
Succeeded: 1,
Terminating: ptr.To[int32](0),
Terminating: ptr.To[int32](1),
CompletedIndexes: "1",
UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
Conditions: []batch.JobCondition{
@ -3746,7 +3746,7 @@ func TestSyncJobWithJobSuccessPolicy(t *testing.T) {
wantStatus: batch.JobStatus{
Failed: 1,
Succeeded: 1,
Terminating: ptr.To[int32](0),
Terminating: ptr.To[int32](1),
CompletedIndexes: "1",
FailedIndexes: ptr.To(""),
UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
@ -3868,7 +3868,7 @@ func TestSyncJobWithJobSuccessPolicy(t *testing.T) {
wantStatus: batch.JobStatus{
Failed: 1,
Succeeded: 1,
Terminating: ptr.To[int32](0),
Terminating: ptr.To[int32](2),
CompletedIndexes: "1",
UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
Conditions: []batch.JobCondition{
@ -3931,7 +3931,7 @@ func TestSyncJobWithJobSuccessPolicy(t *testing.T) {
wantStatus: batch.JobStatus{
Failed: 2,
Succeeded: 1,
Terminating: ptr.To[int32](0),
Terminating: ptr.To[int32](1),
CompletedIndexes: "1",
UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
Conditions: []batch.JobCondition{
@ -4761,7 +4761,7 @@ func TestSyncJobWithJobBackoffLimitPerIndex(t *testing.T) {
wantStatus: batch.JobStatus{
Failed: 3,
Succeeded: 1,
Terminating: ptr.To[int32](0),
Terminating: ptr.To[int32](1),
FailedIndexes: ptr.To("0,2"),
CompletedIndexes: "1",
UncountedTerminatedPods: &batch.UncountedTerminatedPods{},

View File

@ -514,6 +514,7 @@ func TestSuccessPolicy(t *testing.T) {
wantActiveIndexes sets.Set[int]
wantCompletedIndexes string
wantFailedIndexes *string
wantTerminating *int32
}
podTemplateSpec := v1.PodTemplateSpec{
@ -561,6 +562,7 @@ func TestSuccessPolicy(t *testing.T) {
wantFailed: 0,
wantSucceeded: 1,
wantCompletedIndexes: "0",
wantTerminating: ptr.To(int32(0)),
},
},
wantConditionTypes: []batchv1.JobConditionType{batchv1.JobSuccessCriteriaMet, batchv1.JobComplete},
@ -595,6 +597,7 @@ func TestSuccessPolicy(t *testing.T) {
wantFailed: 0,
wantSucceeded: 1,
wantCompletedIndexes: "0",
wantTerminating: ptr.To(int32(0)),
},
},
wantConditionTypes: []batchv1.JobConditionType{batchv1.JobComplete},
@ -630,6 +633,7 @@ func TestSuccessPolicy(t *testing.T) {
wantActiveIndexes: sets.New(0, 1),
wantFailed: 0,
wantSucceeded: 0,
wantTerminating: ptr.To(int32(0)),
},
{
index: 1,
@ -640,6 +644,7 @@ func TestSuccessPolicy(t *testing.T) {
wantFailed: 0,
wantSucceeded: 1,
wantCompletedIndexes: "1",
wantTerminating: ptr.To(int32(1)),
},
},
wantConditionTypes: []batchv1.JobConditionType{batchv1.JobSuccessCriteriaMet, batchv1.JobComplete},
@ -675,6 +680,7 @@ func TestSuccessPolicy(t *testing.T) {
wantActiveIndexes: sets.New(0, 1),
wantFailed: 0,
wantSucceeded: 0,
wantTerminating: ptr.To(int32(0)),
},
{
index: 1,
@ -685,6 +691,7 @@ func TestSuccessPolicy(t *testing.T) {
wantFailed: 0,
wantSucceeded: 1,
wantCompletedIndexes: "1",
wantTerminating: ptr.To(int32(1)),
},
},
wantConditionTypes: []batchv1.JobConditionType{batchv1.JobSuccessCriteriaMet, batchv1.JobComplete},
@ -723,6 +730,7 @@ func TestSuccessPolicy(t *testing.T) {
wantFailed: 1,
wantFailedIndexes: ptr.To("0"),
wantSucceeded: 0,
wantTerminating: ptr.To(int32(0)),
},
{
index: 1,
@ -734,6 +742,7 @@ func TestSuccessPolicy(t *testing.T) {
wantSucceeded: 1,
wantFailedIndexes: ptr.To("0"),
wantCompletedIndexes: "1",
wantTerminating: ptr.To(int32(0)),
},
},
wantConditionTypes: []batchv1.JobConditionType{batchv1.JobFailed},
@ -774,7 +783,7 @@ func TestSuccessPolicy(t *testing.T) {
Succeeded: podTermination.wantSucceeded,
Failed: podTermination.wantFailed,
Ready: ptr.To[int32](0),
Terminating: ptr.To[int32](0),
Terminating: podTermination.wantTerminating,
})
validateIndexedJobPods(ctx, t, clientSet, jobObj, podTermination.wantActiveIndexes, podTermination.wantCompletedIndexes, podTermination.wantFailedIndexes)
}
@ -861,7 +870,7 @@ func TestSuccessPolicy_ReEnabling(t *testing.T) {
Active: 0,
Succeeded: 3,
Ready: ptr.To[int32](0),
Terminating: ptr.To[int32](0),
Terminating: ptr.To[int32](2),
})
validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.New[int](), "0-2", nil)
@ -1168,6 +1177,7 @@ func TestBackoffLimitPerIndex(t *testing.T) {
wantCompletedIndexes string
wantFailedIndexes *string
wantReplacementPodFailureCount *int
wantTerminating *int32
}
podTemplateSpec := v1.PodTemplateSpec{
@ -1208,6 +1218,7 @@ func TestBackoffLimitPerIndex(t *testing.T) {
wantActiveIndexes: sets.New(0, 1),
wantFailedIndexes: ptr.To(""),
wantReplacementPodFailureCount: ptr.To(1),
wantTerminating: ptr.To(int32(0)),
},
},
wantJobConditionType: batchv1.JobComplete,
@ -1238,6 +1249,7 @@ func TestBackoffLimitPerIndex(t *testing.T) {
wantActiveIndexes: sets.New(0, 1),
wantFailedIndexes: ptr.To(""),
wantReplacementPodFailureCount: ptr.To(1),
wantTerminating: ptr.To(int32(0)),
},
{
status: v1.PodStatus{
@ -1248,6 +1260,7 @@ func TestBackoffLimitPerIndex(t *testing.T) {
wantActiveIndexes: sets.New(0, 1),
wantFailedIndexes: ptr.To(""),
wantReplacementPodFailureCount: ptr.To(2),
wantTerminating: ptr.To(int32(0)),
},
{
status: v1.PodStatus{
@ -1257,6 +1270,7 @@ func TestBackoffLimitPerIndex(t *testing.T) {
wantFailed: 3,
wantActiveIndexes: sets.New(1),
wantFailedIndexes: ptr.To("0"),
wantTerminating: ptr.To(int32(0)),
},
},
wantJobConditionType: batchv1.JobFailed,
@ -1292,6 +1306,7 @@ func TestBackoffLimitPerIndex(t *testing.T) {
wantFailed: 1,
wantActiveIndexes: sets.New(0, 1, 2),
wantFailedIndexes: ptr.To(""),
wantTerminating: ptr.To(int32(0)),
},
{
index: 1,
@ -1302,6 +1317,7 @@ func TestBackoffLimitPerIndex(t *testing.T) {
wantFailed: 2,
wantActiveIndexes: sets.New(0, 1, 2),
wantFailedIndexes: ptr.To(""),
wantTerminating: ptr.To(int32(0)),
},
{
index: 2,
@ -1310,6 +1326,7 @@ func TestBackoffLimitPerIndex(t *testing.T) {
},
wantFailed: 5,
wantFailedIndexes: ptr.To(""),
wantTerminating: ptr.To(int32(2)),
},
},
wantJobConditionType: batchv1.JobFailed,
@ -1344,6 +1361,7 @@ func TestBackoffLimitPerIndex(t *testing.T) {
wantFailed: 1,
wantActiveIndexes: sets.New(1),
wantFailedIndexes: ptr.To("0"),
wantTerminating: ptr.To(int32(0)),
},
{
index: 1,
@ -1354,6 +1372,7 @@ func TestBackoffLimitPerIndex(t *testing.T) {
wantSucceeded: 1,
wantFailedIndexes: ptr.To("0"),
wantCompletedIndexes: "1",
wantTerminating: ptr.To(int32(0)),
},
},
wantJobConditionType: batchv1.JobFailed,
@ -1389,6 +1408,7 @@ func TestBackoffLimitPerIndex(t *testing.T) {
wantFailed: 1,
wantActiveIndexes: sets.New(1, 2),
wantFailedIndexes: ptr.To("0"),
wantTerminating: ptr.To(int32(0)),
},
{
index: 1,
@ -1398,6 +1418,7 @@ func TestBackoffLimitPerIndex(t *testing.T) {
wantActive: 0,
wantFailed: 3,
wantFailedIndexes: ptr.To("0,1"),
wantTerminating: ptr.To(int32(1)),
},
},
wantJobConditionType: batchv1.JobFailed,
@ -1457,6 +1478,7 @@ func TestBackoffLimitPerIndex(t *testing.T) {
wantFailed: 1,
wantActiveIndexes: sets.New(1),
wantFailedIndexes: ptr.To("0"),
wantTerminating: ptr.To(int32(0)),
},
{
index: 1,
@ -1471,6 +1493,7 @@ func TestBackoffLimitPerIndex(t *testing.T) {
},
wantFailed: 2,
wantFailedIndexes: ptr.To("0,1"),
wantTerminating: ptr.To(int32(0)),
},
},
wantJobConditionType: batchv1.JobFailed,
@ -1517,7 +1540,7 @@ func TestBackoffLimitPerIndex(t *testing.T) {
Succeeded: podTermination.wantSucceeded,
Failed: podTermination.wantFailed,
Ready: ptr.To[int32](0),
Terminating: ptr.To[int32](0),
Terminating: podTermination.wantTerminating,
})
validateIndexedJobPods(ctx, t, clientSet, jobObj, podTermination.wantActiveIndexes, podTermination.wantCompletedIndexes, podTermination.wantFailedIndexes)
if podTermination.wantReplacementPodFailureCount != nil {
@ -2730,6 +2753,7 @@ func TestElasticIndexedJob(t *testing.T) {
wantFailed int
wantRemainingIndexes sets.Set[int]
wantActivePods int
wantTerminating *int32
}
cases := map[string]struct {
featureGate bool
@ -2739,7 +2763,8 @@ func TestElasticIndexedJob(t *testing.T) {
"feature flag off, mutation not allowed": {
jobUpdates: []jobUpdate{
{
completions: ptr.To[int32](4),
completions: ptr.To[int32](4),
wantTerminating: ptr.To[int32](0),
},
},
wantErr: apierrors.NewInvalid(
@ -2756,6 +2781,7 @@ func TestElasticIndexedJob(t *testing.T) {
completions: ptr.To[int32](4),
succeedIndexes: []int{0, 1, 2, 3},
wantSucceededIndexes: "0-3",
wantTerminating: ptr.To[int32](0),
},
},
},
@ -2770,6 +2796,7 @@ func TestElasticIndexedJob(t *testing.T) {
wantFailed: 1,
wantRemainingIndexes: sets.New(0, 2),
wantActivePods: 2,
wantTerminating: ptr.To[int32](0),
},
// Scale down completions 3->1, verify prev failure out of range still counts
// but succeeded out of range does not.
@ -2778,6 +2805,7 @@ func TestElasticIndexedJob(t *testing.T) {
succeedIndexes: []int{0},
wantSucceededIndexes: "0",
wantFailed: 1,
wantTerminating: ptr.To[int32](0),
},
},
},
@ -2790,18 +2818,21 @@ func TestElasticIndexedJob(t *testing.T) {
wantSucceededIndexes: "2",
wantRemainingIndexes: sets.New(0, 1),
wantActivePods: 2,
wantTerminating: ptr.To[int32](0),
},
// Scale completions down 3->2 to exclude previously succeeded index.
{
completions: ptr.To[int32](2),
wantRemainingIndexes: sets.New(0, 1),
wantActivePods: 2,
wantTerminating: ptr.To[int32](0),
},
// Scale completions back up to include previously succeeded index that was temporarily out of range.
{
completions: ptr.To[int32](3),
succeedIndexes: []int{0, 1, 2},
wantSucceededIndexes: "0-2",
wantTerminating: ptr.To[int32](0),
},
},
},
@ -2809,7 +2840,8 @@ func TestElasticIndexedJob(t *testing.T) {
featureGate: true,
jobUpdates: []jobUpdate{
{
completions: ptr.To[int32](0),
completions: ptr.To[int32](0),
wantTerminating: ptr.To[int32](3),
},
},
},
@ -2887,7 +2919,7 @@ func TestElasticIndexedJob(t *testing.T) {
Succeeded: len(update.succeedIndexes),
Failed: update.wantFailed,
Ready: ptr.To[int32](0),
Terminating: ptr.To[int32](0),
Terminating: update.wantTerminating,
})
validateIndexedJobPods(ctx, t, clientSet, jobObj, update.wantRemainingIndexes, update.wantSucceededIndexes, nil)
}