mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 19:56:01 +00:00
Merge pull request #125546 from dejanzele/feat/count-ready-pods-for-failed-jobs
Update the count of ready pods when deleting pods
This commit is contained in:
commit
95e16917f7
@ -140,6 +140,7 @@ type syncJobCtx struct {
|
||||
uncounted *uncountedTerminatedPods
|
||||
podsWithDelayedDeletionPerIndex map[int]*v1.Pod
|
||||
terminating *int32
|
||||
ready int32
|
||||
}
|
||||
|
||||
// NewController creates a new Job controller that keeps the relevant pods
|
||||
@ -811,10 +812,12 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
activePods := controller.FilterActivePods(logger, pods)
|
||||
jobCtx := &syncJobCtx{
|
||||
job: &job,
|
||||
pods: pods,
|
||||
activePods: controller.FilterActivePods(logger, pods),
|
||||
activePods: activePods,
|
||||
ready: countReadyPods(activePods),
|
||||
uncounted: newUncountedTerminatedPods(*job.Status.UncountedTerminatedPods),
|
||||
expectedRmFinalizers: jm.finalizerExpectations.getExpectedUIDs(key),
|
||||
}
|
||||
@ -825,7 +828,6 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) {
|
||||
newSucceededPods, newFailedPods := getNewFinishedPods(jobCtx)
|
||||
jobCtx.succeeded = job.Status.Succeeded + int32(len(newSucceededPods)) + int32(len(jobCtx.uncounted.succeeded))
|
||||
jobCtx.failed = job.Status.Failed + int32(nonIgnoredFailedPodsCount(jobCtx, newFailedPods)) + int32(len(jobCtx.uncounted.failed))
|
||||
ready := ptr.To(countReadyPods(jobCtx.activePods))
|
||||
|
||||
// Job first start. Set StartTime only if the job is not in the suspended state.
|
||||
if job.Status.StartTime == nil && !jobSuspended(&job) {
|
||||
@ -887,7 +889,7 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) {
|
||||
suspendCondChanged := false
|
||||
// Remove active pods if Job failed.
|
||||
if jobCtx.finishedCondition != nil {
|
||||
deleted, err := jm.deleteActivePods(ctx, &job, jobCtx.activePods)
|
||||
deletedReady, deleted, err := jm.deleteActivePods(ctx, &job, jobCtx.activePods)
|
||||
if deleted != active || !satisfiedExpectations {
|
||||
// Can't declare the Job as finished yet, as there might be remaining
|
||||
// pod finalizers or pods that are not in the informer's cache yet.
|
||||
@ -897,6 +899,7 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) {
|
||||
if trackTerminatingPods(jobCtx.job) {
|
||||
*jobCtx.terminating += deleted
|
||||
}
|
||||
jobCtx.ready -= deletedReady
|
||||
manageJobErr = err
|
||||
} else {
|
||||
manageJobCalled := false
|
||||
@ -958,10 +961,10 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) {
|
||||
if feature.DefaultFeatureGate.Enabled(features.JobPodReplacementPolicy) {
|
||||
terminating = jobCtx.terminating
|
||||
}
|
||||
needsStatusUpdate := suspendCondChanged || active != job.Status.Active || !ptr.Equal(ready, job.Status.Ready)
|
||||
needsStatusUpdate := suspendCondChanged || active != job.Status.Active || !ptr.Equal(&jobCtx.ready, job.Status.Ready)
|
||||
needsStatusUpdate = needsStatusUpdate || !ptr.Equal(job.Status.Terminating, terminating)
|
||||
job.Status.Active = active
|
||||
job.Status.Ready = ready
|
||||
job.Status.Ready = &jobCtx.ready
|
||||
job.Status.Terminating = terminating
|
||||
err = jm.trackJobStatusAndRemoveFinalizers(ctx, jobCtx, needsStatusUpdate)
|
||||
if err != nil {
|
||||
@ -975,10 +978,11 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) {
|
||||
// This is done through DELETE calls that set deletion timestamps.
|
||||
// The method trackJobStatusAndRemoveFinalizers removes the finalizers, after
|
||||
// which the objects can actually be deleted.
|
||||
// Returns number of successfully deletions issued.
|
||||
func (jm *Controller) deleteActivePods(ctx context.Context, job *batch.Job, pods []*v1.Pod) (int32, error) {
|
||||
// Returns number of successfully deleted ready pods and total number of successfully deleted pods.
|
||||
func (jm *Controller) deleteActivePods(ctx context.Context, job *batch.Job, pods []*v1.Pod) (int32, int32, error) {
|
||||
errCh := make(chan error, len(pods))
|
||||
successfulDeletes := int32(len(pods))
|
||||
var deletedReady int32 = 0
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(len(pods))
|
||||
for i := range pods {
|
||||
@ -989,10 +993,13 @@ func (jm *Controller) deleteActivePods(ctx context.Context, job *batch.Job, pods
|
||||
errCh <- err
|
||||
utilruntime.HandleError(err)
|
||||
}
|
||||
if podutil.IsPodReady(pod) {
|
||||
atomic.AddInt32(&deletedReady, 1)
|
||||
}
|
||||
}(pods[i])
|
||||
}
|
||||
wg.Wait()
|
||||
return successfulDeletes, errorFromChannel(errCh)
|
||||
return deletedReady, successfulDeletes, errorFromChannel(errCh)
|
||||
}
|
||||
|
||||
func nonIgnoredFailedPodsCount(jobCtx *syncJobCtx, failedPods []*v1.Pod) int {
|
||||
@ -1008,11 +1015,12 @@ func nonIgnoredFailedPodsCount(jobCtx *syncJobCtx, failedPods []*v1.Pod) int {
|
||||
return result
|
||||
}
|
||||
|
||||
// deleteJobPods deletes the pods, returns the number of successful removals
|
||||
// deleteJobPods deletes the pods, returns the number of successful removals of ready pods and total number of successful pod removals
|
||||
// and any error.
|
||||
func (jm *Controller) deleteJobPods(ctx context.Context, job *batch.Job, jobKey string, pods []*v1.Pod) (int32, error) {
|
||||
func (jm *Controller) deleteJobPods(ctx context.Context, job *batch.Job, jobKey string, pods []*v1.Pod) (int32, int32, error) {
|
||||
errCh := make(chan error, len(pods))
|
||||
successfulDeletes := int32(len(pods))
|
||||
var deletedReady int32 = 0
|
||||
logger := klog.FromContext(ctx)
|
||||
|
||||
failDelete := func(pod *v1.Pod, err error) {
|
||||
@ -1040,10 +1048,13 @@ func (jm *Controller) deleteJobPods(ctx context.Context, job *batch.Job, jobKey
|
||||
if err := jm.podControl.DeletePod(ctx, job.Namespace, pod.Name, job); err != nil {
|
||||
failDelete(pod, err)
|
||||
}
|
||||
if podutil.IsPodReady(pod) {
|
||||
atomic.AddInt32(&deletedReady, 1)
|
||||
}
|
||||
}(pods[i])
|
||||
}
|
||||
wg.Wait()
|
||||
return successfulDeletes, errorFromChannel(errCh)
|
||||
return deletedReady, successfulDeletes, errorFromChannel(errCh)
|
||||
}
|
||||
|
||||
// trackJobStatusAndRemoveFinalizers does:
|
||||
@ -1507,11 +1518,12 @@ func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, jobCtx *syn
|
||||
logger.V(4).Info("Deleting all active pods in suspended job", "job", klog.KObj(job), "active", active)
|
||||
podsToDelete := activePodsForRemoval(job, jobCtx.activePods, int(active))
|
||||
jm.expectations.ExpectDeletions(logger, jobKey, len(podsToDelete))
|
||||
removed, err := jm.deleteJobPods(ctx, job, jobKey, podsToDelete)
|
||||
removedReady, removed, err := jm.deleteJobPods(ctx, job, jobKey, podsToDelete)
|
||||
active -= removed
|
||||
if trackTerminatingPods(job) {
|
||||
*jobCtx.terminating += removed
|
||||
}
|
||||
jobCtx.ready -= removedReady
|
||||
return active, metrics.JobSyncActionPodsDeleted, err
|
||||
}
|
||||
|
||||
@ -1548,11 +1560,12 @@ func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, jobCtx *syn
|
||||
if len(podsToDelete) > 0 {
|
||||
jm.expectations.ExpectDeletions(logger, jobKey, len(podsToDelete))
|
||||
logger.V(4).Info("Too many pods running for job", "job", klog.KObj(job), "deleted", len(podsToDelete), "target", wantActive)
|
||||
removed, err := jm.deleteJobPods(ctx, job, jobKey, podsToDelete)
|
||||
removedReady, removed, err := jm.deleteJobPods(ctx, job, jobKey, podsToDelete)
|
||||
active -= removed
|
||||
if trackTerminatingPods(job) {
|
||||
*jobCtx.terminating += removed
|
||||
}
|
||||
jobCtx.ready -= removedReady
|
||||
// While it is possible for a Job to require both pod creations and
|
||||
// deletions at the same time (e.g. indexed Jobs with repeated indexes), we
|
||||
// restrict ourselves to either just pod deletion or pod creation in any
|
||||
|
@ -520,6 +520,17 @@ func TestControllerSyncJob(t *testing.T) {
|
||||
expectedActive: 3,
|
||||
expectedReady: ptr.To[int32](0),
|
||||
},
|
||||
"count ready pods when too many active pods": {
|
||||
parallelism: 2,
|
||||
completions: 5,
|
||||
backoffLimit: 6,
|
||||
activePods: 3,
|
||||
readyPods: 3,
|
||||
expectedDeletions: 1,
|
||||
expectedActive: 2,
|
||||
expectedPodPatches: 1,
|
||||
expectedReady: ptr.To[int32](2),
|
||||
},
|
||||
"failed + succeed pods: reset backoff delay": {
|
||||
parallelism: 2,
|
||||
completions: 5,
|
||||
@ -799,6 +810,21 @@ func TestControllerSyncJob(t *testing.T) {
|
||||
expectedDeletions: 1,
|
||||
expectedTerminating: ptr.To[int32](1),
|
||||
},
|
||||
"count ready pods when job fails": {
|
||||
parallelism: 2,
|
||||
completions: 3,
|
||||
backoffLimit: 0,
|
||||
activePods: 2,
|
||||
readyPods: 2,
|
||||
failedPods: 1,
|
||||
expectedFailed: 3,
|
||||
expectedCondition: &jobConditionFailed,
|
||||
expectedConditionStatus: v1.ConditionTrue,
|
||||
expectedConditionReason: "BackoffLimitExceeded",
|
||||
expectedPodPatches: 3,
|
||||
expectedReady: ptr.To[int32](0),
|
||||
expectedDeletions: 2,
|
||||
},
|
||||
"indexed job repeated completed index": {
|
||||
parallelism: 2,
|
||||
completions: 3,
|
||||
@ -961,6 +987,22 @@ func TestControllerSyncJob(t *testing.T) {
|
||||
expectedReady: ptr.To[int32](0),
|
||||
expectedTerminating: ptr.To[int32](2),
|
||||
},
|
||||
"count ready pods when suspending a job with satisfied expectations": {
|
||||
suspend: true,
|
||||
parallelism: 2,
|
||||
activePods: 2, // parallelism == active, expectations satisfied
|
||||
readyPods: 2,
|
||||
completions: 4,
|
||||
backoffLimit: 6,
|
||||
expectedCreations: 0,
|
||||
expectedDeletions: 2,
|
||||
expectedActive: 0,
|
||||
expectedCondition: &jobConditionSuspended,
|
||||
expectedConditionStatus: v1.ConditionTrue,
|
||||
expectedConditionReason: "JobSuspended",
|
||||
expectedPodPatches: 2,
|
||||
expectedReady: ptr.To[int32](0),
|
||||
},
|
||||
"suspending a job with unsatisfied expectations": {
|
||||
// Unlike the previous test, we expect the controller to NOT suspend the
|
||||
// Job in the syncJob call because the controller will wait for
|
||||
@ -977,6 +1019,19 @@ func TestControllerSyncJob(t *testing.T) {
|
||||
expectedActive: 3,
|
||||
expectedReady: ptr.To[int32](0),
|
||||
},
|
||||
"count ready pods when suspending a job with unsatisfied expectations": {
|
||||
suspend: true,
|
||||
parallelism: 2,
|
||||
activePods: 3, // active > parallelism, expectations unsatisfied
|
||||
readyPods: 3,
|
||||
fakeExpectationAtCreation: -1, // the controller is expecting a deletion
|
||||
completions: 4,
|
||||
backoffLimit: 6,
|
||||
expectedCreations: 0,
|
||||
expectedDeletions: 0,
|
||||
expectedActive: 3,
|
||||
expectedReady: ptr.To[int32](3),
|
||||
},
|
||||
"suspending a job with unsatisfied expectations; PodReplacementPolicy enabled": {
|
||||
suspend: true,
|
||||
parallelism: 2,
|
||||
|
Loading…
Reference in New Issue
Block a user