mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-06 10:43:56 +00:00
Remarks to syncJobCtx
This commit is contained in:
parent
990339d4c3
commit
bf48165232
@ -131,7 +131,7 @@ type Controller struct {
|
|||||||
podBackoffStore *backoffStore
|
podBackoffStore *backoffStore
|
||||||
}
|
}
|
||||||
|
|
||||||
type syncJobContext struct {
|
type syncJobCtx struct {
|
||||||
job *batch.Job
|
job *batch.Job
|
||||||
pods []*v1.Pod
|
pods []*v1.Pod
|
||||||
finishedCondition *batch.JobCondition
|
finishedCondition *batch.JobCondition
|
||||||
@ -142,7 +142,6 @@ type syncJobContext struct {
|
|||||||
newBackoffRecord backoffRecord
|
newBackoffRecord backoffRecord
|
||||||
expectedRmFinalizers sets.Set[string]
|
expectedRmFinalizers sets.Set[string]
|
||||||
uncounted *uncountedTerminatedPods
|
uncounted *uncountedTerminatedPods
|
||||||
podFailureCountByPolicyAction map[string]int
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewController creates a new Job controller that keeps the relevant pods
|
// NewController creates a new Job controller that keeps the relevant pods
|
||||||
@ -756,9 +755,6 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
syncJobContext := &syncJobContext{}
|
|
||||||
syncJobContext.job = &job
|
|
||||||
|
|
||||||
completionMode := getCompletionMode(&job)
|
completionMode := getCompletionMode(&job)
|
||||||
action := metrics.JobSyncActionReconciling
|
action := metrics.JobSyncActionReconciling
|
||||||
|
|
||||||
@ -775,9 +771,6 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) {
|
|||||||
if job.Status.UncountedTerminatedPods == nil {
|
if job.Status.UncountedTerminatedPods == nil {
|
||||||
job.Status.UncountedTerminatedPods = &batch.UncountedTerminatedPods{}
|
job.Status.UncountedTerminatedPods = &batch.UncountedTerminatedPods{}
|
||||||
}
|
}
|
||||||
uncounted := newUncountedTerminatedPods(*job.Status.UncountedTerminatedPods)
|
|
||||||
syncJobContext.uncounted = uncounted
|
|
||||||
syncJobContext.expectedRmFinalizers = jm.finalizerExpectations.getExpectedUIDs(key)
|
|
||||||
|
|
||||||
// Check the expectations of the job before counting active pods, otherwise a new pod can sneak in
|
// Check the expectations of the job before counting active pods, otherwise a new pod can sneak in
|
||||||
// and update the expectations after we've retrieved active pods from the store. If a new pod enters
|
// and update the expectations after we've retrieved active pods from the store. If a new pod enters
|
||||||
@ -789,16 +782,20 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
syncJobContext.pods = pods
|
jobCtx := &syncJobCtx{
|
||||||
activePods := controller.FilterActivePods(pods)
|
job: &job,
|
||||||
syncJobContext.activePods = activePods
|
pods: pods,
|
||||||
active := int32(len(activePods))
|
activePods: controller.FilterActivePods(pods),
|
||||||
newSucceededPods, newFailedPods := getNewFinishedPods(syncJobContext)
|
uncounted: newUncountedTerminatedPods(*job.Status.UncountedTerminatedPods),
|
||||||
syncJobContext.succeeded = job.Status.Succeeded + int32(len(newSucceededPods)) + int32(len(uncounted.succeeded))
|
expectedRmFinalizers: jm.finalizerExpectations.getExpectedUIDs(key),
|
||||||
failed := job.Status.Failed + int32(len(newFailedPods)) + int32(len(uncounted.failed))
|
}
|
||||||
|
active := int32(len(jobCtx.activePods))
|
||||||
|
newSucceededPods, newFailedPods := getNewFinishedPods(jobCtx)
|
||||||
|
jobCtx.succeeded = job.Status.Succeeded + int32(len(newSucceededPods)) + int32(len(jobCtx.uncounted.succeeded))
|
||||||
|
failed := job.Status.Failed + int32(len(newFailedPods)) + int32(len(jobCtx.uncounted.failed))
|
||||||
var ready *int32
|
var ready *int32
|
||||||
if feature.DefaultFeatureGate.Enabled(features.JobReadyPods) {
|
if feature.DefaultFeatureGate.Enabled(features.JobReadyPods) {
|
||||||
ready = pointer.Int32(countReadyPods(activePods))
|
ready = pointer.Int32(countReadyPods(jobCtx.activePods))
|
||||||
}
|
}
|
||||||
|
|
||||||
// Job first start. Set StartTime only if the job is not in the suspended state.
|
// Job first start. Set StartTime only if the job is not in the suspended state.
|
||||||
@ -807,7 +804,7 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) {
|
|||||||
job.Status.StartTime = &now
|
job.Status.StartTime = &now
|
||||||
}
|
}
|
||||||
|
|
||||||
syncJobContext.newBackoffRecord = jm.podBackoffStore.newBackoffRecord(key, newSucceededPods, newFailedPods)
|
jobCtx.newBackoffRecord = jm.podBackoffStore.newBackoffRecord(key, newSucceededPods, newFailedPods)
|
||||||
|
|
||||||
var manageJobErr error
|
var manageJobErr error
|
||||||
|
|
||||||
@ -815,19 +812,19 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) {
|
|||||||
|
|
||||||
if feature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) {
|
if feature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) {
|
||||||
if failureTargetCondition := findConditionByType(job.Status.Conditions, batch.JobFailureTarget); failureTargetCondition != nil {
|
if failureTargetCondition := findConditionByType(job.Status.Conditions, batch.JobFailureTarget); failureTargetCondition != nil {
|
||||||
syncJobContext.finishedCondition = newFailedConditionForFailureTarget(failureTargetCondition, jm.clock.Now())
|
jobCtx.finishedCondition = newFailedConditionForFailureTarget(failureTargetCondition, jm.clock.Now())
|
||||||
} else if failJobMessage := getFailJobMessage(&job, pods); failJobMessage != nil {
|
} else if failJobMessage := getFailJobMessage(&job, pods); failJobMessage != nil {
|
||||||
// Prepare the interim FailureTarget condition to record the failure message before the finalizers (allowing removal of the pods) are removed.
|
// Prepare the interim FailureTarget condition to record the failure message before the finalizers (allowing removal of the pods) are removed.
|
||||||
syncJobContext.finishedCondition = newCondition(batch.JobFailureTarget, v1.ConditionTrue, jobConditionReasonPodFailurePolicy, *failJobMessage, jm.clock.Now())
|
jobCtx.finishedCondition = newCondition(batch.JobFailureTarget, v1.ConditionTrue, jobConditionReasonPodFailurePolicy, *failJobMessage, jm.clock.Now())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if syncJobContext.finishedCondition == nil {
|
if jobCtx.finishedCondition == nil {
|
||||||
if exceedsBackoffLimit || pastBackoffLimitOnFailure(&job, pods) {
|
if exceedsBackoffLimit || pastBackoffLimitOnFailure(&job, pods) {
|
||||||
// check if the number of pod restart exceeds backoff (for restart OnFailure only)
|
// check if the number of pod restart exceeds backoff (for restart OnFailure only)
|
||||||
// OR if the number of failed jobs increased since the last syncJob
|
// OR if the number of failed jobs increased since the last syncJob
|
||||||
syncJobContext.finishedCondition = newCondition(batch.JobFailed, v1.ConditionTrue, "BackoffLimitExceeded", "Job has reached the specified backoff limit", jm.clock.Now())
|
jobCtx.finishedCondition = newCondition(batch.JobFailed, v1.ConditionTrue, "BackoffLimitExceeded", "Job has reached the specified backoff limit", jm.clock.Now())
|
||||||
} else if jm.pastActiveDeadline(&job) {
|
} else if jm.pastActiveDeadline(&job) {
|
||||||
syncJobContext.finishedCondition = newCondition(batch.JobFailed, v1.ConditionTrue, "DeadlineExceeded", "Job was active longer than specified deadline", jm.clock.Now())
|
jobCtx.finishedCondition = newCondition(batch.JobFailed, v1.ConditionTrue, "DeadlineExceeded", "Job was active longer than specified deadline", jm.clock.Now())
|
||||||
} else if job.Spec.ActiveDeadlineSeconds != nil && !jobSuspended(&job) {
|
} else if job.Spec.ActiveDeadlineSeconds != nil && !jobSuspended(&job) {
|
||||||
syncDuration := time.Duration(*job.Spec.ActiveDeadlineSeconds)*time.Second - jm.clock.Since(job.Status.StartTime.Time)
|
syncDuration := time.Duration(*job.Spec.ActiveDeadlineSeconds)*time.Second - jm.clock.Since(job.Status.StartTime.Time)
|
||||||
logger.V(2).Info("Job has activeDeadlineSeconds configuration. Will sync this job again", "key", key, "nextSyncIn", syncDuration)
|
logger.V(2).Info("Job has activeDeadlineSeconds configuration. Will sync this job again", "key", key, "nextSyncIn", syncDuration)
|
||||||
@ -835,28 +832,25 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
var prevSucceededIndexes, succeededIndexes orderedIntervals
|
|
||||||
if isIndexedJob(&job) {
|
if isIndexedJob(&job) {
|
||||||
prevSucceededIndexes, succeededIndexes = calculateSucceededIndexes(logger, &job, pods)
|
jobCtx.prevSucceededIndexes, jobCtx.succeededIndexes = calculateSucceededIndexes(logger, &job, pods)
|
||||||
syncJobContext.prevSucceededIndexes = prevSucceededIndexes
|
jobCtx.succeeded = int32(jobCtx.succeededIndexes.total())
|
||||||
syncJobContext.succeededIndexes = succeededIndexes
|
|
||||||
syncJobContext.succeeded = int32(succeededIndexes.total())
|
|
||||||
}
|
}
|
||||||
suspendCondChanged := false
|
suspendCondChanged := false
|
||||||
// Remove active pods if Job failed.
|
// Remove active pods if Job failed.
|
||||||
if syncJobContext.finishedCondition != nil {
|
if jobCtx.finishedCondition != nil {
|
||||||
deleted, err := jm.deleteActivePods(ctx, &job, activePods)
|
deleted, err := jm.deleteActivePods(ctx, &job, jobCtx.activePods)
|
||||||
if deleted != active || !satisfiedExpectations {
|
if deleted != active || !satisfiedExpectations {
|
||||||
// Can't declare the Job as finished yet, as there might be remaining
|
// Can't declare the Job as finished yet, as there might be remaining
|
||||||
// pod finalizers or pods that are not in the informer's cache yet.
|
// pod finalizers or pods that are not in the informer's cache yet.
|
||||||
syncJobContext.finishedCondition = nil
|
jobCtx.finishedCondition = nil
|
||||||
}
|
}
|
||||||
active -= deleted
|
active -= deleted
|
||||||
manageJobErr = err
|
manageJobErr = err
|
||||||
} else {
|
} else {
|
||||||
manageJobCalled := false
|
manageJobCalled := false
|
||||||
if satisfiedExpectations && job.DeletionTimestamp == nil {
|
if satisfiedExpectations && job.DeletionTimestamp == nil {
|
||||||
active, action, manageJobErr = jm.manageJob(ctx, syncJobContext)
|
active, action, manageJobErr = jm.manageJob(ctx, &job, jobCtx)
|
||||||
manageJobCalled = true
|
manageJobCalled = true
|
||||||
}
|
}
|
||||||
complete := false
|
complete := false
|
||||||
@ -867,16 +861,16 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) {
|
|||||||
// not expected to fail, but if they do, the failure is ignored. Once any
|
// not expected to fail, but if they do, the failure is ignored. Once any
|
||||||
// pod succeeds, the controller waits for remaining pods to finish, and
|
// pod succeeds, the controller waits for remaining pods to finish, and
|
||||||
// then the job is complete.
|
// then the job is complete.
|
||||||
complete = syncJobContext.succeeded > 0 && active == 0
|
complete = jobCtx.succeeded > 0 && active == 0
|
||||||
} else {
|
} else {
|
||||||
// Job specifies a number of completions. This type of job signals
|
// Job specifies a number of completions. This type of job signals
|
||||||
// success by having that number of successes. Since we do not
|
// success by having that number of successes. Since we do not
|
||||||
// start more pods than there are remaining completions, there should
|
// start more pods than there are remaining completions, there should
|
||||||
// not be any remaining active pods once this count is reached.
|
// not be any remaining active pods once this count is reached.
|
||||||
complete = syncJobContext.succeeded >= *job.Spec.Completions && active == 0
|
complete = jobCtx.succeeded >= *job.Spec.Completions && active == 0
|
||||||
}
|
}
|
||||||
if complete {
|
if complete {
|
||||||
syncJobContext.finishedCondition = newCondition(batch.JobComplete, v1.ConditionTrue, "", "", jm.clock.Now())
|
jobCtx.finishedCondition = newCondition(batch.JobComplete, v1.ConditionTrue, "", "", jm.clock.Now())
|
||||||
} else if manageJobCalled {
|
} else if manageJobCalled {
|
||||||
// Update the conditions / emit events only if manageJob was called in
|
// Update the conditions / emit events only if manageJob was called in
|
||||||
// this syncJob. Otherwise wait for the right syncJob call to make
|
// this syncJob. Otherwise wait for the right syncJob call to make
|
||||||
@ -912,7 +906,7 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) {
|
|||||||
needsStatusUpdate := suspendCondChanged || active != job.Status.Active || !pointer.Int32Equal(ready, job.Status.Ready)
|
needsStatusUpdate := suspendCondChanged || active != job.Status.Active || !pointer.Int32Equal(ready, job.Status.Ready)
|
||||||
job.Status.Active = active
|
job.Status.Active = active
|
||||||
job.Status.Ready = ready
|
job.Status.Ready = ready
|
||||||
err = jm.trackJobStatusAndRemoveFinalizers(ctx, syncJobContext, needsStatusUpdate)
|
err = jm.trackJobStatusAndRemoveFinalizers(ctx, jobCtx, needsStatusUpdate)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("tracking status: %w", err)
|
return fmt.Errorf("tracking status: %w", err)
|
||||||
}
|
}
|
||||||
@ -991,66 +985,60 @@ func (jm *Controller) deleteJobPods(ctx context.Context, job *batch.Job, jobKey
|
|||||||
//
|
//
|
||||||
// It does this up to a limited number of Pods so that the size of .status
|
// It does this up to a limited number of Pods so that the size of .status
|
||||||
// doesn't grow too much and this sync doesn't starve other Jobs.
|
// doesn't grow too much and this sync doesn't starve other Jobs.
|
||||||
func (jm *Controller) trackJobStatusAndRemoveFinalizers(ctx context.Context, syncJobContext *syncJobContext, needsFlush bool) error {
|
func (jm *Controller) trackJobStatusAndRemoveFinalizers(ctx context.Context, jobCtx *syncJobCtx, needsFlush bool) error {
|
||||||
logger := klog.FromContext(ctx)
|
logger := klog.FromContext(ctx)
|
||||||
job := syncJobContext.job
|
|
||||||
pods := syncJobContext.pods
|
|
||||||
finishedCond := syncJobContext.finishedCondition
|
|
||||||
expectedRmFinalizers := syncJobContext.expectedRmFinalizers
|
|
||||||
succeededIndexes := syncJobContext.succeededIndexes
|
|
||||||
uncounted := syncJobContext.uncounted
|
|
||||||
|
|
||||||
isIndexed := isIndexedJob(job)
|
isIndexed := isIndexedJob(jobCtx.job)
|
||||||
var podsToRemoveFinalizer []*v1.Pod
|
var podsToRemoveFinalizer []*v1.Pod
|
||||||
uncountedStatus := job.Status.UncountedTerminatedPods
|
uncountedStatus := jobCtx.job.Status.UncountedTerminatedPods
|
||||||
var newSucceededIndexes []int
|
var newSucceededIndexes []int
|
||||||
if isIndexed {
|
if isIndexed {
|
||||||
// Sort to introduce completed Indexes in order.
|
// Sort to introduce completed Indexes in order.
|
||||||
sort.Sort(byCompletionIndex(pods))
|
sort.Sort(byCompletionIndex(jobCtx.pods))
|
||||||
}
|
}
|
||||||
uidsWithFinalizer := make(sets.Set[string], len(pods))
|
uidsWithFinalizer := make(sets.Set[string], len(jobCtx.pods))
|
||||||
for _, p := range pods {
|
for _, p := range jobCtx.pods {
|
||||||
uid := string(p.UID)
|
uid := string(p.UID)
|
||||||
if hasJobTrackingFinalizer(p) && !expectedRmFinalizers.Has(uid) {
|
if hasJobTrackingFinalizer(p) && !jobCtx.expectedRmFinalizers.Has(uid) {
|
||||||
uidsWithFinalizer.Insert(uid)
|
uidsWithFinalizer.Insert(uid)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Shallow copy, as it will only be used to detect changes in the counters.
|
// Shallow copy, as it will only be used to detect changes in the counters.
|
||||||
oldCounters := job.Status
|
oldCounters := jobCtx.job.Status
|
||||||
if cleanUncountedPodsWithoutFinalizers(&job.Status, uidsWithFinalizer) {
|
if cleanUncountedPodsWithoutFinalizers(&jobCtx.job.Status, uidsWithFinalizer) {
|
||||||
needsFlush = true
|
needsFlush = true
|
||||||
}
|
}
|
||||||
podFailureCountByPolicyAction := map[string]int{}
|
podFailureCountByPolicyAction := map[string]int{}
|
||||||
reachedMaxUncountedPods := false
|
reachedMaxUncountedPods := false
|
||||||
for _, pod := range pods {
|
for _, pod := range jobCtx.pods {
|
||||||
if !hasJobTrackingFinalizer(pod) || expectedRmFinalizers.Has(string(pod.UID)) {
|
if !hasJobTrackingFinalizer(pod) || jobCtx.expectedRmFinalizers.Has(string(pod.UID)) {
|
||||||
// This pod was processed in a previous sync.
|
// This pod was processed in a previous sync.
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
considerPodFailed := isPodFailed(pod, job)
|
considerPodFailed := isPodFailed(pod, jobCtx.job)
|
||||||
if podutil.IsPodTerminal(pod) || considerPodFailed || finishedCond != nil || job.DeletionTimestamp != nil {
|
if podutil.IsPodTerminal(pod) || considerPodFailed || jobCtx.finishedCondition != nil || jobCtx.job.DeletionTimestamp != nil {
|
||||||
podsToRemoveFinalizer = append(podsToRemoveFinalizer, pod)
|
podsToRemoveFinalizer = append(podsToRemoveFinalizer, pod)
|
||||||
}
|
}
|
||||||
if pod.Status.Phase == v1.PodSucceeded && !uncounted.failed.Has(string(pod.UID)) {
|
if pod.Status.Phase == v1.PodSucceeded && !jobCtx.uncounted.failed.Has(string(pod.UID)) {
|
||||||
if isIndexed {
|
if isIndexed {
|
||||||
// The completion index is enough to avoid recounting succeeded pods.
|
// The completion index is enough to avoid recounting succeeded pods.
|
||||||
// No need to track UIDs.
|
// No need to track UIDs.
|
||||||
ix := getCompletionIndex(pod.Annotations)
|
ix := getCompletionIndex(pod.Annotations)
|
||||||
if ix != unknownCompletionIndex && ix < int(*job.Spec.Completions) && !succeededIndexes.has(ix) {
|
if ix != unknownCompletionIndex && ix < int(*jobCtx.job.Spec.Completions) && !jobCtx.prevSucceededIndexes.has(ix) {
|
||||||
newSucceededIndexes = append(newSucceededIndexes, ix)
|
newSucceededIndexes = append(newSucceededIndexes, ix)
|
||||||
needsFlush = true
|
needsFlush = true
|
||||||
}
|
}
|
||||||
} else if !uncounted.succeeded.Has(string(pod.UID)) {
|
} else if !jobCtx.uncounted.succeeded.Has(string(pod.UID)) {
|
||||||
needsFlush = true
|
needsFlush = true
|
||||||
uncountedStatus.Succeeded = append(uncountedStatus.Succeeded, pod.UID)
|
uncountedStatus.Succeeded = append(uncountedStatus.Succeeded, pod.UID)
|
||||||
}
|
}
|
||||||
} else if considerPodFailed || finishedCond != nil {
|
} else if considerPodFailed || jobCtx.finishedCondition != nil {
|
||||||
// When the job is considered finished, every non-terminated pod is considered failed
|
// When the job is considered finished, every non-terminated pod is considered failed
|
||||||
ix := getCompletionIndex(pod.Annotations)
|
ix := getCompletionIndex(pod.Annotations)
|
||||||
if !uncounted.failed.Has(string(pod.UID)) && (!isIndexed || (ix != unknownCompletionIndex && ix < int(*job.Spec.Completions))) {
|
if !jobCtx.uncounted.failed.Has(string(pod.UID)) && (!isIndexed || (ix != unknownCompletionIndex && ix < int(*jobCtx.job.Spec.Completions))) {
|
||||||
if feature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) && job.Spec.PodFailurePolicy != nil {
|
if feature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) && jobCtx.job.Spec.PodFailurePolicy != nil {
|
||||||
_, countFailed, action := matchPodFailurePolicy(job.Spec.PodFailurePolicy, pod)
|
_, countFailed, action := matchPodFailurePolicy(jobCtx.job.Spec.PodFailurePolicy, pod)
|
||||||
if action != nil {
|
if action != nil {
|
||||||
podFailureCountByPolicyAction[string(*action)] += 1
|
podFailureCountByPolicyAction[string(*action)] += 1
|
||||||
}
|
}
|
||||||
@ -1078,43 +1066,42 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(ctx context.Context, syn
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
if isIndexed {
|
if isIndexed {
|
||||||
succeededIndexes = succeededIndexes.withOrderedIndexes(newSucceededIndexes)
|
jobCtx.succeededIndexes = jobCtx.succeededIndexes.withOrderedIndexes(newSucceededIndexes)
|
||||||
succeededIndexesStr := succeededIndexes.String()
|
succeededIndexesStr := jobCtx.succeededIndexes.String()
|
||||||
if succeededIndexesStr != job.Status.CompletedIndexes {
|
if succeededIndexesStr != jobCtx.job.Status.CompletedIndexes {
|
||||||
needsFlush = true
|
needsFlush = true
|
||||||
}
|
}
|
||||||
job.Status.Succeeded = int32(succeededIndexes.total())
|
jobCtx.job.Status.Succeeded = int32(jobCtx.succeededIndexes.total())
|
||||||
job.Status.CompletedIndexes = succeededIndexesStr
|
jobCtx.job.Status.CompletedIndexes = succeededIndexesStr
|
||||||
}
|
}
|
||||||
if feature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) {
|
if feature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) {
|
||||||
if finishedCond != nil && finishedCond.Type == batch.JobFailureTarget {
|
if jobCtx.finishedCondition != nil && jobCtx.finishedCondition.Type == batch.JobFailureTarget {
|
||||||
|
|
||||||
// Append the interim FailureTarget condition to update the job status with before finalizers are removed.
|
// Append the interim FailureTarget condition to update the job status with before finalizers are removed.
|
||||||
job.Status.Conditions = append(job.Status.Conditions, *finishedCond)
|
jobCtx.job.Status.Conditions = append(jobCtx.job.Status.Conditions, *jobCtx.finishedCondition)
|
||||||
needsFlush = true
|
needsFlush = true
|
||||||
|
|
||||||
// Prepare the final Failed condition to update the job status with after the finalizers are removed.
|
// Prepare the final Failed condition to update the job status with after the finalizers are removed.
|
||||||
// It is also used in the enactJobFinished function for reporting.
|
// It is also used in the enactJobFinished function for reporting.
|
||||||
finishedCond = newFailedConditionForFailureTarget(finishedCond, jm.clock.Now())
|
jobCtx.finishedCondition = newFailedConditionForFailureTarget(jobCtx.finishedCondition, jm.clock.Now())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
syncJobContext.podFailureCountByPolicyAction = podFailureCountByPolicyAction
|
|
||||||
var err error
|
var err error
|
||||||
if job, needsFlush, err = jm.flushUncountedAndRemoveFinalizers(ctx, syncJobContext, podsToRemoveFinalizer, uidsWithFinalizer, &oldCounters, needsFlush); err != nil {
|
if jobCtx.job, needsFlush, err = jm.flushUncountedAndRemoveFinalizers(ctx, jobCtx, podsToRemoveFinalizer, uidsWithFinalizer, &oldCounters, podFailureCountByPolicyAction, needsFlush); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
jobFinished := !reachedMaxUncountedPods && jm.enactJobFinished(job, finishedCond)
|
jobFinished := !reachedMaxUncountedPods && jm.enactJobFinished(jobCtx.job, jobCtx.finishedCondition)
|
||||||
if jobFinished {
|
if jobFinished {
|
||||||
needsFlush = true
|
needsFlush = true
|
||||||
}
|
}
|
||||||
if needsFlush {
|
if needsFlush {
|
||||||
if _, err := jm.updateStatusHandler(ctx, job); err != nil {
|
if _, err := jm.updateStatusHandler(ctx, jobCtx.job); err != nil {
|
||||||
return fmt.Errorf("removing uncounted pods from status: %w", err)
|
return fmt.Errorf("removing uncounted pods from status: %w", err)
|
||||||
}
|
}
|
||||||
if jobFinished {
|
if jobFinished {
|
||||||
jm.recordJobFinished(job, finishedCond)
|
jm.recordJobFinished(jobCtx.job, jobCtx.finishedCondition)
|
||||||
}
|
}
|
||||||
recordJobPodFinished(logger, job, oldCounters)
|
recordJobPodFinished(logger, jobCtx.job, oldCounters)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -1130,18 +1117,15 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(ctx context.Context, syn
|
|||||||
//
|
//
|
||||||
// Returns whether there are pending changes in the Job status that need to be
|
// Returns whether there are pending changes in the Job status that need to be
|
||||||
// flushed in subsequent calls.
|
// flushed in subsequent calls.
|
||||||
func (jm *Controller) flushUncountedAndRemoveFinalizers(ctx context.Context, syncJobContext *syncJobContext, podsToRemoveFinalizer []*v1.Pod, uidsWithFinalizer sets.Set[string], oldCounters *batch.JobStatus, needsFlush bool) (*batch.Job, bool, error) {
|
func (jm *Controller) flushUncountedAndRemoveFinalizers(ctx context.Context, jobCtx *syncJobCtx, podsToRemoveFinalizer []*v1.Pod, uidsWithFinalizer sets.Set[string], oldCounters *batch.JobStatus, podFailureCountByPolicyAction map[string]int, needsFlush bool) (*batch.Job, bool, error) {
|
||||||
logger := klog.FromContext(ctx)
|
logger := klog.FromContext(ctx)
|
||||||
job := syncJobContext.job
|
|
||||||
newBackoffRecord := syncJobContext.newBackoffRecord
|
|
||||||
podFailureCountByPolicyAction := syncJobContext.podFailureCountByPolicyAction
|
|
||||||
var err error
|
var err error
|
||||||
if needsFlush {
|
if needsFlush {
|
||||||
if job, err = jm.updateStatusHandler(ctx, job); err != nil {
|
if jobCtx.job, err = jm.updateStatusHandler(ctx, jobCtx.job); err != nil {
|
||||||
return job, needsFlush, fmt.Errorf("adding uncounted pods to status: %w", err)
|
return jobCtx.job, needsFlush, fmt.Errorf("adding uncounted pods to status: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = jm.podBackoffStore.updateBackoffRecord(newBackoffRecord)
|
err = jm.podBackoffStore.updateBackoffRecord(jobCtx.newBackoffRecord)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// this error might undercount the backoff.
|
// this error might undercount the backoff.
|
||||||
@ -1150,16 +1134,16 @@ func (jm *Controller) flushUncountedAndRemoveFinalizers(ctx context.Context, syn
|
|||||||
logger.Error(err, "Backoff update failed")
|
logger.Error(err, "Backoff update failed")
|
||||||
}
|
}
|
||||||
|
|
||||||
recordJobPodFinished(logger, job, *oldCounters)
|
recordJobPodFinished(logger, jobCtx.job, *oldCounters)
|
||||||
// Shallow copy, as it will only be used to detect changes in the counters.
|
// Shallow copy, as it will only be used to detect changes in the counters.
|
||||||
*oldCounters = job.Status
|
*oldCounters = jobCtx.job.Status
|
||||||
needsFlush = false
|
needsFlush = false
|
||||||
}
|
}
|
||||||
recordJobPodFailurePolicyActions(job, podFailureCountByPolicyAction)
|
recordJobPodFailurePolicyActions(jobCtx.job, podFailureCountByPolicyAction)
|
||||||
|
|
||||||
jobKey, err := controller.KeyFunc(job)
|
jobKey, err := controller.KeyFunc(jobCtx.job)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return job, needsFlush, fmt.Errorf("getting job key: %w", err)
|
return jobCtx.job, needsFlush, fmt.Errorf("getting job key: %w", err)
|
||||||
}
|
}
|
||||||
var rmErr error
|
var rmErr error
|
||||||
if len(podsToRemoveFinalizer) > 0 {
|
if len(podsToRemoveFinalizer) > 0 {
|
||||||
@ -1173,15 +1157,15 @@ func (jm *Controller) flushUncountedAndRemoveFinalizers(ctx context.Context, syn
|
|||||||
}
|
}
|
||||||
// Failed to remove some finalizers. Attempt to update the status with the
|
// Failed to remove some finalizers. Attempt to update the status with the
|
||||||
// partial progress.
|
// partial progress.
|
||||||
if cleanUncountedPodsWithoutFinalizers(&job.Status, uidsWithFinalizer) {
|
if cleanUncountedPodsWithoutFinalizers(&jobCtx.job.Status, uidsWithFinalizer) {
|
||||||
needsFlush = true
|
needsFlush = true
|
||||||
}
|
}
|
||||||
if rmErr != nil && needsFlush {
|
if rmErr != nil && needsFlush {
|
||||||
if job, err := jm.updateStatusHandler(ctx, job); err != nil {
|
if job, err := jm.updateStatusHandler(ctx, jobCtx.job); err != nil {
|
||||||
return job, needsFlush, fmt.Errorf("removing uncounted pods from status: %w", err)
|
return job, needsFlush, fmt.Errorf("removing uncounted pods from status: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return job, needsFlush, rmErr
|
return jobCtx.job, needsFlush, rmErr
|
||||||
}
|
}
|
||||||
|
|
||||||
// cleanUncountedPodsWithoutFinalizers removes the Pod UIDs from
|
// cleanUncountedPodsWithoutFinalizers removes the Pod UIDs from
|
||||||
@ -1369,20 +1353,19 @@ func getFailJobMessage(job *batch.Job, pods []*v1.Pod) *string {
|
|||||||
|
|
||||||
// getNewFinishedPods returns the list of newly succeeded and failed pods that are not accounted
|
// getNewFinishedPods returns the list of newly succeeded and failed pods that are not accounted
|
||||||
// in the job status. The list of failed pods can be affected by the podFailurePolicy.
|
// in the job status. The list of failed pods can be affected by the podFailurePolicy.
|
||||||
func getNewFinishedPods(syncJobContext *syncJobContext) (succeededPods, failedPods []*v1.Pod) {
|
func getNewFinishedPods(jobCtx *syncJobCtx) (succeededPods, failedPods []*v1.Pod) {
|
||||||
job := syncJobContext.job
|
succeededPods = getValidPodsWithFilter(jobCtx, jobCtx.uncounted.Succeeded(), func(p *v1.Pod) bool {
|
||||||
succeededPods = getValidPodsWithFilter(syncJobContext, syncJobContext.uncounted.Succeeded(), func(p *v1.Pod) bool {
|
|
||||||
return p.Status.Phase == v1.PodSucceeded
|
return p.Status.Phase == v1.PodSucceeded
|
||||||
})
|
})
|
||||||
failedPods = getValidPodsWithFilter(syncJobContext, syncJobContext.uncounted.Failed(), func(p *v1.Pod) bool {
|
failedPods = getValidPodsWithFilter(jobCtx, jobCtx.uncounted.Failed(), func(p *v1.Pod) bool {
|
||||||
if feature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) && job.Spec.PodFailurePolicy != nil {
|
if feature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) && jobCtx.job.Spec.PodFailurePolicy != nil {
|
||||||
if !isPodFailed(p, job) {
|
if !isPodFailed(p, jobCtx.job) {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
_, countFailed, _ := matchPodFailurePolicy(job.Spec.PodFailurePolicy, p)
|
_, countFailed, _ := matchPodFailurePolicy(jobCtx.job.Spec.PodFailurePolicy, p)
|
||||||
return countFailed
|
return countFailed
|
||||||
} else {
|
} else {
|
||||||
return isPodFailed(p, job)
|
return isPodFailed(p, jobCtx.job)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
return succeededPods, failedPods
|
return succeededPods, failedPods
|
||||||
@ -1398,15 +1381,9 @@ func jobSuspended(job *batch.Job) bool {
|
|||||||
// pods according to what is specified in the job.Spec.
|
// pods according to what is specified in the job.Spec.
|
||||||
// Respects back-off; does not create new pods if the back-off time has not passed
|
// Respects back-off; does not create new pods if the back-off time has not passed
|
||||||
// Does NOT modify <activePods>.
|
// Does NOT modify <activePods>.
|
||||||
func (jm *Controller) manageJob(ctx context.Context, syncJobContext *syncJobContext) (int32, string, error) {
|
func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, jobCtx *syncJobCtx) (int32, string, error) {
|
||||||
logger := klog.FromContext(ctx)
|
logger := klog.FromContext(ctx)
|
||||||
job := syncJobContext.job
|
active := int32(len(jobCtx.activePods))
|
||||||
activePods := syncJobContext.activePods
|
|
||||||
succeeded := syncJobContext.succeeded
|
|
||||||
succeededIndexes := syncJobContext.succeededIndexes
|
|
||||||
newBackoffRecord := syncJobContext.newBackoffRecord
|
|
||||||
|
|
||||||
active := int32(len(activePods))
|
|
||||||
parallelism := *job.Spec.Parallelism
|
parallelism := *job.Spec.Parallelism
|
||||||
jobKey, err := controller.KeyFunc(job)
|
jobKey, err := controller.KeyFunc(job)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -1416,7 +1393,7 @@ func (jm *Controller) manageJob(ctx context.Context, syncJobContext *syncJobCont
|
|||||||
|
|
||||||
if jobSuspended(job) {
|
if jobSuspended(job) {
|
||||||
logger.V(4).Info("Deleting all active pods in suspended job", "job", klog.KObj(job), "active", active)
|
logger.V(4).Info("Deleting all active pods in suspended job", "job", klog.KObj(job), "active", active)
|
||||||
podsToDelete := activePodsForRemoval(job, activePods, int(active))
|
podsToDelete := activePodsForRemoval(job, jobCtx.activePods, int(active))
|
||||||
jm.expectations.ExpectDeletions(jobKey, len(podsToDelete))
|
jm.expectations.ExpectDeletions(jobKey, len(podsToDelete))
|
||||||
removed, err := jm.deleteJobPods(ctx, job, jobKey, podsToDelete)
|
removed, err := jm.deleteJobPods(ctx, job, jobKey, podsToDelete)
|
||||||
active -= removed
|
active -= removed
|
||||||
@ -1428,7 +1405,7 @@ func (jm *Controller) manageJob(ctx context.Context, syncJobContext *syncJobCont
|
|||||||
// Job does not specify a number of completions. Therefore, number active
|
// Job does not specify a number of completions. Therefore, number active
|
||||||
// should be equal to parallelism, unless the job has seen at least
|
// should be equal to parallelism, unless the job has seen at least
|
||||||
// once success, in which leave whatever is running, running.
|
// once success, in which leave whatever is running, running.
|
||||||
if succeeded > 0 {
|
if jobCtx.succeeded > 0 {
|
||||||
wantActive = active
|
wantActive = active
|
||||||
} else {
|
} else {
|
||||||
wantActive = parallelism
|
wantActive = parallelism
|
||||||
@ -1436,7 +1413,7 @@ func (jm *Controller) manageJob(ctx context.Context, syncJobContext *syncJobCont
|
|||||||
} else {
|
} else {
|
||||||
// Job specifies a specific number of completions. Therefore, number
|
// Job specifies a specific number of completions. Therefore, number
|
||||||
// active should not ever exceed number of remaining completions.
|
// active should not ever exceed number of remaining completions.
|
||||||
wantActive = *job.Spec.Completions - succeeded
|
wantActive = *job.Spec.Completions - jobCtx.succeeded
|
||||||
if wantActive > parallelism {
|
if wantActive > parallelism {
|
||||||
wantActive = parallelism
|
wantActive = parallelism
|
||||||
}
|
}
|
||||||
@ -1449,7 +1426,7 @@ func (jm *Controller) manageJob(ctx context.Context, syncJobContext *syncJobCont
|
|||||||
if rmAtLeast < 0 {
|
if rmAtLeast < 0 {
|
||||||
rmAtLeast = 0
|
rmAtLeast = 0
|
||||||
}
|
}
|
||||||
podsToDelete := activePodsForRemoval(job, activePods, int(rmAtLeast))
|
podsToDelete := activePodsForRemoval(job, jobCtx.activePods, int(rmAtLeast))
|
||||||
if len(podsToDelete) > MaxPodCreateDeletePerSync {
|
if len(podsToDelete) > MaxPodCreateDeletePerSync {
|
||||||
podsToDelete = podsToDelete[:MaxPodCreateDeletePerSync]
|
podsToDelete = podsToDelete[:MaxPodCreateDeletePerSync]
|
||||||
}
|
}
|
||||||
@ -1466,7 +1443,7 @@ func (jm *Controller) manageJob(ctx context.Context, syncJobContext *syncJobCont
|
|||||||
}
|
}
|
||||||
|
|
||||||
if active < wantActive {
|
if active < wantActive {
|
||||||
remainingTime := newBackoffRecord.getRemainingTime(jm.clock, DefaultJobPodFailureBackOff, MaxJobPodFailureBackOff)
|
remainingTime := jobCtx.newBackoffRecord.getRemainingTime(jm.clock, DefaultJobPodFailureBackOff, MaxJobPodFailureBackOff)
|
||||||
if remainingTime > 0 {
|
if remainingTime > 0 {
|
||||||
jm.enqueueSyncJobWithDelay(logger, job, remainingTime)
|
jm.enqueueSyncJobWithDelay(logger, job, remainingTime)
|
||||||
return 0, metrics.JobSyncActionPodsCreated, nil
|
return 0, metrics.JobSyncActionPodsCreated, nil
|
||||||
@ -1484,7 +1461,7 @@ func (jm *Controller) manageJob(ctx context.Context, syncJobContext *syncJobCont
|
|||||||
|
|
||||||
var indexesToAdd []int
|
var indexesToAdd []int
|
||||||
if isIndexedJob(job) {
|
if isIndexedJob(job) {
|
||||||
indexesToAdd = firstPendingIndexes(activePods, succeededIndexes, int(diff), int(*job.Spec.Completions))
|
indexesToAdd = firstPendingIndexes(jobCtx.activePods, jobCtx.succeededIndexes, int(diff), int(*job.Spec.Completions))
|
||||||
diff = int32(len(indexesToAdd))
|
diff = int32(len(indexesToAdd))
|
||||||
}
|
}
|
||||||
active += diff
|
active += diff
|
||||||
@ -1600,22 +1577,19 @@ func (jm *Controller) patchJob(ctx context.Context, job *batch.Job, data []byte)
|
|||||||
// getValidPodsWithFilter returns the valid pods that pass the filter.
|
// getValidPodsWithFilter returns the valid pods that pass the filter.
|
||||||
// Pods are valid if they have a finalizer or in uncounted set
|
// Pods are valid if they have a finalizer or in uncounted set
|
||||||
// and, for Indexed Jobs, a valid completion index.
|
// and, for Indexed Jobs, a valid completion index.
|
||||||
func getValidPodsWithFilter(synJobContext *syncJobContext, uncounted sets.Set[string], filter func(*v1.Pod) bool) []*v1.Pod {
|
func getValidPodsWithFilter(jobCtx *syncJobCtx, uncounted sets.Set[string], filter func(*v1.Pod) bool) []*v1.Pod {
|
||||||
job := synJobContext.job
|
|
||||||
pods := synJobContext.pods
|
|
||||||
expectedRmFinalizers := synJobContext.expectedRmFinalizers
|
|
||||||
var result []*v1.Pod
|
var result []*v1.Pod
|
||||||
for _, p := range pods {
|
for _, p := range jobCtx.pods {
|
||||||
uid := string(p.UID)
|
uid := string(p.UID)
|
||||||
|
|
||||||
// Pods that don't have a completion finalizer are in the uncounted set or
|
// Pods that don't have a completion finalizer are in the uncounted set or
|
||||||
// have already been accounted for in the Job status.
|
// have already been accounted for in the Job status.
|
||||||
if !hasJobTrackingFinalizer(p) || uncounted.Has(uid) || expectedRmFinalizers.Has(uid) {
|
if !hasJobTrackingFinalizer(p) || uncounted.Has(uid) || jobCtx.expectedRmFinalizers.Has(uid) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if isIndexedJob(job) {
|
if isIndexedJob(jobCtx.job) {
|
||||||
idx := getCompletionIndex(p.Annotations)
|
idx := getCompletionIndex(p.Annotations)
|
||||||
if idx == unknownCompletionIndex || idx >= int(*job.Spec.Completions) {
|
if idx == unknownCompletionIndex || idx >= int(*jobCtx.job.Spec.Completions) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1079,8 +1079,8 @@ func TestGetNewFinshedPods(t *testing.T) {
|
|||||||
for name, tc := range cases {
|
for name, tc := range cases {
|
||||||
t.Run(name, func(t *testing.T) {
|
t.Run(name, func(t *testing.T) {
|
||||||
uncounted := newUncountedTerminatedPods(*tc.job.Status.UncountedTerminatedPods)
|
uncounted := newUncountedTerminatedPods(*tc.job.Status.UncountedTerminatedPods)
|
||||||
syncJobContext := &syncJobContext{job: &tc.job, pods: tc.pods, uncounted: uncounted, expectedRmFinalizers: tc.expectedRmFinalizers}
|
jobCtx := &syncJobCtx{job: &tc.job, pods: tc.pods, uncounted: uncounted, expectedRmFinalizers: tc.expectedRmFinalizers}
|
||||||
succeededPods, failedPods := getNewFinishedPods(syncJobContext)
|
succeededPods, failedPods := getNewFinishedPods(jobCtx)
|
||||||
succeeded := int32(len(succeededPods)) + tc.job.Status.Succeeded + int32(len(uncounted.succeeded))
|
succeeded := int32(len(succeededPods)) + tc.job.Status.Succeeded + int32(len(uncounted.succeeded))
|
||||||
failed := int32(len(failedPods)) + tc.job.Status.Failed + int32(len(uncounted.failed))
|
failed := int32(len(failedPods)) + tc.job.Status.Failed + int32(len(uncounted.failed))
|
||||||
if succeeded != tc.wantSucceeded {
|
if succeeded != tc.wantSucceeded {
|
||||||
@ -1655,7 +1655,7 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) {
|
|||||||
if isIndexedJob(job) {
|
if isIndexedJob(job) {
|
||||||
succeededIndexes = succeededIndexesFromString(logger, job.Status.CompletedIndexes, int(*job.Spec.Completions))
|
succeededIndexes = succeededIndexesFromString(logger, job.Status.CompletedIndexes, int(*job.Spec.Completions))
|
||||||
}
|
}
|
||||||
syncJobContext := &syncJobContext{
|
jobCtx := &syncJobCtx{
|
||||||
job: job,
|
job: job,
|
||||||
pods: tc.pods,
|
pods: tc.pods,
|
||||||
succeededIndexes: succeededIndexes,
|
succeededIndexes: succeededIndexes,
|
||||||
@ -1664,7 +1664,7 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) {
|
|||||||
finishedCondition: tc.finishedCond,
|
finishedCondition: tc.finishedCond,
|
||||||
newBackoffRecord: backoffRecord{},
|
newBackoffRecord: backoffRecord{},
|
||||||
}
|
}
|
||||||
err := manager.trackJobStatusAndRemoveFinalizers(context.TODO(), syncJobContext, tc.needsFlush)
|
err := manager.trackJobStatusAndRemoveFinalizers(ctx, jobCtx, tc.needsFlush)
|
||||||
if !errors.Is(err, tc.wantErr) {
|
if !errors.Is(err, tc.wantErr) {
|
||||||
t.Errorf("Got error %v, want %v", err, tc.wantErr)
|
t.Errorf("Got error %v, want %v", err, tc.wantErr)
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user