count ready pods when deleting active pods for failed jobs

This commit is contained in:
Dejan Zele Pejchev 2024-06-17 14:55:13 +02:00
parent fbaf9b0353
commit 11b6e4c404
No known key found for this signature in database
GPG Key ID: 8A900F09C964845E
2 changed files with 81 additions and 13 deletions

View File

@ -140,6 +140,7 @@ type syncJobCtx struct {
uncounted *uncountedTerminatedPods
podsWithDelayedDeletionPerIndex map[int]*v1.Pod
terminating *int32
ready int32
}
// NewController creates a new Job controller that keeps the relevant pods
@ -811,10 +812,12 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) {
if err != nil {
return err
}
activePods := controller.FilterActivePods(logger, pods)
jobCtx := &syncJobCtx{
job: &job,
pods: pods,
activePods: controller.FilterActivePods(logger, pods),
activePods: activePods,
ready: countReadyPods(activePods),
uncounted: newUncountedTerminatedPods(*job.Status.UncountedTerminatedPods),
expectedRmFinalizers: jm.finalizerExpectations.getExpectedUIDs(key),
}
@ -825,7 +828,6 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) {
newSucceededPods, newFailedPods := getNewFinishedPods(jobCtx)
jobCtx.succeeded = job.Status.Succeeded + int32(len(newSucceededPods)) + int32(len(jobCtx.uncounted.succeeded))
jobCtx.failed = job.Status.Failed + int32(nonIgnoredFailedPodsCount(jobCtx, newFailedPods)) + int32(len(jobCtx.uncounted.failed))
ready := ptr.To(countReadyPods(jobCtx.activePods))
// Job first start. Set StartTime only if the job is not in the suspended state.
if job.Status.StartTime == nil && !jobSuspended(&job) {
@ -887,7 +889,7 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) {
suspendCondChanged := false
// Remove active pods if Job failed.
if jobCtx.finishedCondition != nil {
deleted, err := jm.deleteActivePods(ctx, &job, jobCtx.activePods)
deletedReady, deleted, err := jm.deleteActivePods(ctx, &job, jobCtx.activePods)
if deleted != active || !satisfiedExpectations {
// Can't declare the Job as finished yet, as there might be remaining
// pod finalizers or pods that are not in the informer's cache yet.
@ -897,6 +899,7 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) {
if trackTerminatingPods(jobCtx.job) {
*jobCtx.terminating += deleted
}
jobCtx.ready -= deletedReady
manageJobErr = err
} else {
manageJobCalled := false
@ -958,10 +961,10 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) {
if feature.DefaultFeatureGate.Enabled(features.JobPodReplacementPolicy) {
terminating = jobCtx.terminating
}
needsStatusUpdate := suspendCondChanged || active != job.Status.Active || !ptr.Equal(ready, job.Status.Ready)
needsStatusUpdate := suspendCondChanged || active != job.Status.Active || !ptr.Equal(&jobCtx.ready, job.Status.Ready)
needsStatusUpdate = needsStatusUpdate || !ptr.Equal(job.Status.Terminating, terminating)
job.Status.Active = active
job.Status.Ready = ready
job.Status.Ready = &jobCtx.ready
job.Status.Terminating = terminating
err = jm.trackJobStatusAndRemoveFinalizers(ctx, jobCtx, needsStatusUpdate)
if err != nil {
@ -975,10 +978,11 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) {
// This is done through DELETE calls that set deletion timestamps.
// The method trackJobStatusAndRemoveFinalizers removes the finalizers, after
// which the objects can actually be deleted.
// Returns number of successfully deletions issued.
func (jm *Controller) deleteActivePods(ctx context.Context, job *batch.Job, pods []*v1.Pod) (int32, error) {
// Returns number of successfully deleted ready pods and total number of successfully deleted pods.
func (jm *Controller) deleteActivePods(ctx context.Context, job *batch.Job, pods []*v1.Pod) (int32, int32, error) {
errCh := make(chan error, len(pods))
successfulDeletes := int32(len(pods))
var deletedReady int32 = 0
wg := sync.WaitGroup{}
wg.Add(len(pods))
for i := range pods {
@ -989,10 +993,13 @@ func (jm *Controller) deleteActivePods(ctx context.Context, job *batch.Job, pods
errCh <- err
utilruntime.HandleError(err)
}
if podutil.IsPodReady(pod) {
atomic.AddInt32(&deletedReady, 1)
}
}(pods[i])
}
wg.Wait()
return successfulDeletes, errorFromChannel(errCh)
return deletedReady, successfulDeletes, errorFromChannel(errCh)
}
func nonIgnoredFailedPodsCount(jobCtx *syncJobCtx, failedPods []*v1.Pod) int {
@ -1008,11 +1015,12 @@ func nonIgnoredFailedPodsCount(jobCtx *syncJobCtx, failedPods []*v1.Pod) int {
return result
}
// deleteJobPods deletes the pods, returns the number of successful removals
// deleteJobPods deletes the pods, returns the number of successful removals of ready pods and total number of successful pod removals
// and any error.
func (jm *Controller) deleteJobPods(ctx context.Context, job *batch.Job, jobKey string, pods []*v1.Pod) (int32, error) {
func (jm *Controller) deleteJobPods(ctx context.Context, job *batch.Job, jobKey string, pods []*v1.Pod) (int32, int32, error) {
errCh := make(chan error, len(pods))
successfulDeletes := int32(len(pods))
var deletedReady int32 = 0
logger := klog.FromContext(ctx)
failDelete := func(pod *v1.Pod, err error) {
@ -1040,10 +1048,13 @@ func (jm *Controller) deleteJobPods(ctx context.Context, job *batch.Job, jobKey
if err := jm.podControl.DeletePod(ctx, job.Namespace, pod.Name, job); err != nil {
failDelete(pod, err)
}
if podutil.IsPodReady(pod) {
atomic.AddInt32(&deletedReady, 1)
}
}(pods[i])
}
wg.Wait()
return successfulDeletes, errorFromChannel(errCh)
return deletedReady, successfulDeletes, errorFromChannel(errCh)
}
// trackJobStatusAndRemoveFinalizers does:
@ -1507,11 +1518,12 @@ func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, jobCtx *syn
logger.V(4).Info("Deleting all active pods in suspended job", "job", klog.KObj(job), "active", active)
podsToDelete := activePodsForRemoval(job, jobCtx.activePods, int(active))
jm.expectations.ExpectDeletions(logger, jobKey, len(podsToDelete))
removed, err := jm.deleteJobPods(ctx, job, jobKey, podsToDelete)
removedReady, removed, err := jm.deleteJobPods(ctx, job, jobKey, podsToDelete)
active -= removed
if trackTerminatingPods(job) {
*jobCtx.terminating += removed
}
jobCtx.ready -= removedReady
return active, metrics.JobSyncActionPodsDeleted, err
}
@ -1548,11 +1560,12 @@ func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, jobCtx *syn
if len(podsToDelete) > 0 {
jm.expectations.ExpectDeletions(logger, jobKey, len(podsToDelete))
logger.V(4).Info("Too many pods running for job", "job", klog.KObj(job), "deleted", len(podsToDelete), "target", wantActive)
removed, err := jm.deleteJobPods(ctx, job, jobKey, podsToDelete)
removedReady, removed, err := jm.deleteJobPods(ctx, job, jobKey, podsToDelete)
active -= removed
if trackTerminatingPods(job) {
*jobCtx.terminating += removed
}
jobCtx.ready -= removedReady
// While it is possible for a Job to require both pod creations and
// deletions at the same time (e.g. indexed Jobs with repeated indexes), we
// restrict ourselves to either just pod deletion or pod creation in any

View File

@ -520,6 +520,17 @@ func TestControllerSyncJob(t *testing.T) {
expectedActive: 3,
expectedReady: ptr.To[int32](0),
},
"count ready pods when too many active pods": {
parallelism: 2,
completions: 5,
backoffLimit: 6,
activePods: 3,
readyPods: 3,
expectedDeletions: 1,
expectedActive: 2,
expectedPodPatches: 1,
expectedReady: ptr.To[int32](2),
},
"failed + succeed pods: reset backoff delay": {
parallelism: 2,
completions: 5,
@ -799,6 +810,21 @@ func TestControllerSyncJob(t *testing.T) {
expectedDeletions: 1,
expectedTerminating: ptr.To[int32](1),
},
"count ready pods when job fails": {
parallelism: 2,
completions: 3,
backoffLimit: 0,
activePods: 2,
readyPods: 2,
failedPods: 1,
expectedFailed: 3,
expectedCondition: &jobConditionFailed,
expectedConditionStatus: v1.ConditionTrue,
expectedConditionReason: "BackoffLimitExceeded",
expectedPodPatches: 3,
expectedReady: ptr.To[int32](0),
expectedDeletions: 2,
},
"indexed job repeated completed index": {
parallelism: 2,
completions: 3,
@ -961,6 +987,22 @@ func TestControllerSyncJob(t *testing.T) {
expectedReady: ptr.To[int32](0),
expectedTerminating: ptr.To[int32](2),
},
"count ready pods when suspending a job with satisfied expectations": {
suspend: true,
parallelism: 2,
activePods: 2, // parallelism == active, expectations satisfied
readyPods: 2,
completions: 4,
backoffLimit: 6,
expectedCreations: 0,
expectedDeletions: 2,
expectedActive: 0,
expectedCondition: &jobConditionSuspended,
expectedConditionStatus: v1.ConditionTrue,
expectedConditionReason: "JobSuspended",
expectedPodPatches: 2,
expectedReady: ptr.To[int32](0),
},
"suspending a job with unsatisfied expectations": {
// Unlike the previous test, we expect the controller to NOT suspend the
// Job in the syncJob call because the controller will wait for
@ -977,6 +1019,19 @@ func TestControllerSyncJob(t *testing.T) {
expectedActive: 3,
expectedReady: ptr.To[int32](0),
},
"count ready pods when suspending a job with unsatisfied expectations": {
suspend: true,
parallelism: 2,
activePods: 3, // active > parallelism, expectations unsatisfied
readyPods: 3,
fakeExpectationAtCreation: -1, // the controller is expecting a deletion
completions: 4,
backoffLimit: 6,
expectedCreations: 0,
expectedDeletions: 0,
expectedActive: 3,
expectedReady: ptr.To[int32](3),
},
"suspending a job with unsatisfied expectations; PodReplacementPolicy enabled": {
suspend: true,
parallelism: 2,