diff --git a/pkg/controller/job/job_controller.go b/pkg/controller/job/job_controller.go index 179c01d4463..b34860bef66 100644 --- a/pkg/controller/job/job_controller.go +++ b/pkg/controller/job/job_controller.go @@ -140,6 +140,7 @@ type syncJobCtx struct { uncounted *uncountedTerminatedPods podsWithDelayedDeletionPerIndex map[int]*v1.Pod terminating *int32 + ready int32 } // NewController creates a new Job controller that keeps the relevant pods @@ -811,10 +812,12 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) { if err != nil { return err } + activePods := controller.FilterActivePods(logger, pods) jobCtx := &syncJobCtx{ job: &job, pods: pods, - activePods: controller.FilterActivePods(logger, pods), + activePods: activePods, + ready: countReadyPods(activePods), uncounted: newUncountedTerminatedPods(*job.Status.UncountedTerminatedPods), expectedRmFinalizers: jm.finalizerExpectations.getExpectedUIDs(key), } @@ -825,7 +828,6 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) { newSucceededPods, newFailedPods := getNewFinishedPods(jobCtx) jobCtx.succeeded = job.Status.Succeeded + int32(len(newSucceededPods)) + int32(len(jobCtx.uncounted.succeeded)) jobCtx.failed = job.Status.Failed + int32(nonIgnoredFailedPodsCount(jobCtx, newFailedPods)) + int32(len(jobCtx.uncounted.failed)) - ready := ptr.To(countReadyPods(jobCtx.activePods)) // Job first start. Set StartTime only if the job is not in the suspended state. if job.Status.StartTime == nil && !jobSuspended(&job) { @@ -887,7 +889,7 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) { suspendCondChanged := false // Remove active pods if Job failed. if jobCtx.finishedCondition != nil { - deleted, err := jm.deleteActivePods(ctx, &job, jobCtx.activePods) + deletedReady, deleted, err := jm.deleteActivePods(ctx, &job, jobCtx.activePods) if deleted != active || !satisfiedExpectations { // Can't declare the Job as finished yet, as there might be remaining // pod finalizers or pods that are not in the informer's cache yet. @@ -897,6 +899,7 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) { if trackTerminatingPods(jobCtx.job) { *jobCtx.terminating += deleted } + jobCtx.ready -= deletedReady manageJobErr = err } else { manageJobCalled := false @@ -958,10 +961,10 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) { if feature.DefaultFeatureGate.Enabled(features.JobPodReplacementPolicy) { terminating = jobCtx.terminating } - needsStatusUpdate := suspendCondChanged || active != job.Status.Active || !ptr.Equal(ready, job.Status.Ready) + needsStatusUpdate := suspendCondChanged || active != job.Status.Active || !ptr.Equal(&jobCtx.ready, job.Status.Ready) needsStatusUpdate = needsStatusUpdate || !ptr.Equal(job.Status.Terminating, terminating) job.Status.Active = active - job.Status.Ready = ready + job.Status.Ready = &jobCtx.ready job.Status.Terminating = terminating err = jm.trackJobStatusAndRemoveFinalizers(ctx, jobCtx, needsStatusUpdate) if err != nil { @@ -975,10 +978,11 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) { // This is done through DELETE calls that set deletion timestamps. // The method trackJobStatusAndRemoveFinalizers removes the finalizers, after // which the objects can actually be deleted. -// Returns number of successfully deletions issued. -func (jm *Controller) deleteActivePods(ctx context.Context, job *batch.Job, pods []*v1.Pod) (int32, error) { +// Returns number of successfully deleted ready pods and total number of successfully deleted pods. +func (jm *Controller) deleteActivePods(ctx context.Context, job *batch.Job, pods []*v1.Pod) (int32, int32, error) { errCh := make(chan error, len(pods)) successfulDeletes := int32(len(pods)) + var deletedReady int32 = 0 wg := sync.WaitGroup{} wg.Add(len(pods)) for i := range pods { @@ -989,10 +993,13 @@ func (jm *Controller) deleteActivePods(ctx context.Context, job *batch.Job, pods errCh <- err utilruntime.HandleError(err) } + if podutil.IsPodReady(pod) { + atomic.AddInt32(&deletedReady, 1) + } }(pods[i]) } wg.Wait() - return successfulDeletes, errorFromChannel(errCh) + return deletedReady, successfulDeletes, errorFromChannel(errCh) } func nonIgnoredFailedPodsCount(jobCtx *syncJobCtx, failedPods []*v1.Pod) int { @@ -1008,11 +1015,12 @@ func nonIgnoredFailedPodsCount(jobCtx *syncJobCtx, failedPods []*v1.Pod) int { return result } -// deleteJobPods deletes the pods, returns the number of successful removals +// deleteJobPods deletes the pods, returns the number of successful removals of ready pods and total number of successful pod removals // and any error. -func (jm *Controller) deleteJobPods(ctx context.Context, job *batch.Job, jobKey string, pods []*v1.Pod) (int32, error) { +func (jm *Controller) deleteJobPods(ctx context.Context, job *batch.Job, jobKey string, pods []*v1.Pod) (int32, int32, error) { errCh := make(chan error, len(pods)) successfulDeletes := int32(len(pods)) + var deletedReady int32 = 0 logger := klog.FromContext(ctx) failDelete := func(pod *v1.Pod, err error) { @@ -1040,10 +1048,13 @@ func (jm *Controller) deleteJobPods(ctx context.Context, job *batch.Job, jobKey if err := jm.podControl.DeletePod(ctx, job.Namespace, pod.Name, job); err != nil { failDelete(pod, err) } + if podutil.IsPodReady(pod) { + atomic.AddInt32(&deletedReady, 1) + } }(pods[i]) } wg.Wait() - return successfulDeletes, errorFromChannel(errCh) + return deletedReady, successfulDeletes, errorFromChannel(errCh) } // trackJobStatusAndRemoveFinalizers does: @@ -1507,11 +1518,12 @@ func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, jobCtx *syn logger.V(4).Info("Deleting all active pods in suspended job", "job", klog.KObj(job), "active", active) podsToDelete := activePodsForRemoval(job, jobCtx.activePods, int(active)) jm.expectations.ExpectDeletions(logger, jobKey, len(podsToDelete)) - removed, err := jm.deleteJobPods(ctx, job, jobKey, podsToDelete) + removedReady, removed, err := jm.deleteJobPods(ctx, job, jobKey, podsToDelete) active -= removed if trackTerminatingPods(job) { *jobCtx.terminating += removed } + jobCtx.ready -= removedReady return active, metrics.JobSyncActionPodsDeleted, err } @@ -1548,11 +1560,12 @@ func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, jobCtx *syn if len(podsToDelete) > 0 { jm.expectations.ExpectDeletions(logger, jobKey, len(podsToDelete)) logger.V(4).Info("Too many pods running for job", "job", klog.KObj(job), "deleted", len(podsToDelete), "target", wantActive) - removed, err := jm.deleteJobPods(ctx, job, jobKey, podsToDelete) + removedReady, removed, err := jm.deleteJobPods(ctx, job, jobKey, podsToDelete) active -= removed if trackTerminatingPods(job) { *jobCtx.terminating += removed } + jobCtx.ready -= removedReady // While it is possible for a Job to require both pod creations and // deletions at the same time (e.g. indexed Jobs with repeated indexes), we // restrict ourselves to either just pod deletion or pod creation in any diff --git a/pkg/controller/job/job_controller_test.go b/pkg/controller/job/job_controller_test.go index 21bf17fc5bb..fc6bd1c5982 100644 --- a/pkg/controller/job/job_controller_test.go +++ b/pkg/controller/job/job_controller_test.go @@ -520,6 +520,17 @@ func TestControllerSyncJob(t *testing.T) { expectedActive: 3, expectedReady: ptr.To[int32](0), }, + "count ready pods when too many active pods": { + parallelism: 2, + completions: 5, + backoffLimit: 6, + activePods: 3, + readyPods: 3, + expectedDeletions: 1, + expectedActive: 2, + expectedPodPatches: 1, + expectedReady: ptr.To[int32](2), + }, "failed + succeed pods: reset backoff delay": { parallelism: 2, completions: 5, @@ -799,6 +810,21 @@ func TestControllerSyncJob(t *testing.T) { expectedDeletions: 1, expectedTerminating: ptr.To[int32](1), }, + "count ready pods when job fails": { + parallelism: 2, + completions: 3, + backoffLimit: 0, + activePods: 2, + readyPods: 2, + failedPods: 1, + expectedFailed: 3, + expectedCondition: &jobConditionFailed, + expectedConditionStatus: v1.ConditionTrue, + expectedConditionReason: "BackoffLimitExceeded", + expectedPodPatches: 3, + expectedReady: ptr.To[int32](0), + expectedDeletions: 2, + }, "indexed job repeated completed index": { parallelism: 2, completions: 3, @@ -961,6 +987,22 @@ func TestControllerSyncJob(t *testing.T) { expectedReady: ptr.To[int32](0), expectedTerminating: ptr.To[int32](2), }, + "count ready pods when suspending a job with satisfied expectations": { + suspend: true, + parallelism: 2, + activePods: 2, // parallelism == active, expectations satisfied + readyPods: 2, + completions: 4, + backoffLimit: 6, + expectedCreations: 0, + expectedDeletions: 2, + expectedActive: 0, + expectedCondition: &jobConditionSuspended, + expectedConditionStatus: v1.ConditionTrue, + expectedConditionReason: "JobSuspended", + expectedPodPatches: 2, + expectedReady: ptr.To[int32](0), + }, "suspending a job with unsatisfied expectations": { // Unlike the previous test, we expect the controller to NOT suspend the // Job in the syncJob call because the controller will wait for @@ -977,6 +1019,19 @@ func TestControllerSyncJob(t *testing.T) { expectedActive: 3, expectedReady: ptr.To[int32](0), }, + "count ready pods when suspending a job with unsatisfied expectations": { + suspend: true, + parallelism: 2, + activePods: 3, // active > parallelism, expectations unsatisfied + readyPods: 3, + fakeExpectationAtCreation: -1, // the controller is expecting a deletion + completions: 4, + backoffLimit: 6, + expectedCreations: 0, + expectedDeletions: 0, + expectedActive: 3, + expectedReady: ptr.To[int32](3), + }, "suspending a job with unsatisfied expectations; PodReplacementPolicy enabled": { suspend: true, parallelism: 2,