Merge pull request #105197 from alculquicondor/job-tracking

Roll-forward: Beta requirements for JobTrackingWithFinalizers
This commit is contained in:
Kubernetes Prow Robot 2021-10-04 18:57:49 -07:00 committed by GitHub
commit 0a29e2a73a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 652 additions and 110 deletions

View File

@ -338,3 +338,10 @@ func (bci byCompletionIndex) Swap(i, j int) {
func (bci byCompletionIndex) Len() int { func (bci byCompletionIndex) Len() int {
return len(bci) return len(bci)
} }
func completionModeStr(job *batch.Job) string {
if job.Spec.CompletionMode != nil {
return string(*job.Spec.CompletionMode)
}
return string(batch.NonIndexedCompletion)
}

View File

@ -55,22 +55,21 @@ import (
"k8s.io/utils/integer" "k8s.io/utils/integer"
) )
const (
// maxUncountedPods is the maximum size the slices in
// .status.uncountedTerminatedPods should have to keep their representation
// roughly below 20 KB.
maxUncountedPods = 500
maxPodCreateDeletePerSync = 500
)
// controllerKind contains the schema.GroupVersionKind for this controller type. // controllerKind contains the schema.GroupVersionKind for this controller type.
var controllerKind = batch.SchemeGroupVersion.WithKind("Job") var controllerKind = batch.SchemeGroupVersion.WithKind("Job")
var ( var (
// DefaultJobBackOff is the default backoff period, exported for the e2e test // DefaultJobBackOff is the default backoff period. Exported for tests.
DefaultJobBackOff = 10 * time.Second DefaultJobBackOff = 10 * time.Second
// MaxJobBackOff is the max backoff period, exported for the e2e test // MaxJobBackOff is the max backoff period. Exported for tests.
MaxJobBackOff = 360 * time.Second MaxJobBackOff = 360 * time.Second
// MaxUncountedPods is the maximum size the slices in
// .status.uncountedTerminatedPods should have to keep their representation
// roughly below 20 KB. Exported for tests
MaxUncountedPods = 500
// MaxPodCreateDeletePerSync is the maximum number of pods that can be
// created or deleted in a single sync call. Exported for tests.
MaxPodCreateDeletePerSync = 500
) )
// Controller ensures that all Job objects have corresponding pods to // Controller ensures that all Job objects have corresponding pods to
@ -94,6 +93,10 @@ type Controller struct {
// A TTLCache of pod creates/deletes each rc expects to see // A TTLCache of pod creates/deletes each rc expects to see
expectations controller.ControllerExpectationsInterface expectations controller.ControllerExpectationsInterface
// finalizerExpectations tracks the Pod UIDs for which the controller
// expects to observe the tracking finalizer removed.
finalizerExpectations *uidTrackingExpectations
// A store of jobs // A store of jobs
jobLister batchv1listers.JobLister jobLister batchv1listers.JobLister
@ -126,10 +129,11 @@ func NewController(podInformer coreinformers.PodInformer, jobInformer batchinfor
KubeClient: kubeClient, KubeClient: kubeClient,
Recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "job-controller"}), Recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "job-controller"}),
}, },
expectations: controller.NewControllerExpectations(), expectations: controller.NewControllerExpectations(),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(DefaultJobBackOff, MaxJobBackOff), "job"), finalizerExpectations: newUIDTrackingExpectations(),
orphanQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(DefaultJobBackOff, MaxJobBackOff), "job_orphan_pod"), queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(DefaultJobBackOff, MaxJobBackOff), "job"),
recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "job-controller"}), orphanQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(DefaultJobBackOff, MaxJobBackOff), "job_orphan_pod"),
recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "job-controller"}),
} }
jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
@ -147,7 +151,9 @@ func NewController(podInformer coreinformers.PodInformer, jobInformer batchinfor
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: jm.addPod, AddFunc: jm.addPod,
UpdateFunc: jm.updatePod, UpdateFunc: jm.updatePod,
DeleteFunc: jm.deletePod, DeleteFunc: func(obj interface{}) {
jm.deletePod(obj, true)
},
}) })
jm.podStore = podInformer.Lister() jm.podStore = podInformer.Lister()
jm.podStoreSynced = podInformer.Informer().HasSynced jm.podStoreSynced = podInformer.Informer().HasSynced
@ -228,7 +234,7 @@ func (jm *Controller) addPod(obj interface{}) {
if pod.DeletionTimestamp != nil { if pod.DeletionTimestamp != nil {
// on a restart of the controller, it's possible a new pod shows up in a state that // on a restart of the controller, it's possible a new pod shows up in a state that
// is already pending deletion. Prevent the pod from being a creation observation. // is already pending deletion. Prevent the pod from being a creation observation.
jm.deletePod(pod) jm.deletePod(pod, false)
return return
} }
@ -272,19 +278,31 @@ func (jm *Controller) updatePod(old, cur interface{}) {
// and after such time has passed, the kubelet actually deletes it from the store. We receive an update // and after such time has passed, the kubelet actually deletes it from the store. We receive an update
// for modification of the deletion timestamp and expect an job to create more pods asap, not wait // for modification of the deletion timestamp and expect an job to create more pods asap, not wait
// until the kubelet actually deletes the pod. // until the kubelet actually deletes the pod.
jm.deletePod(curPod) jm.deletePod(curPod, false)
return return
} }
// the only time we want the backoff to kick-in, is when the pod failed // the only time we want the backoff to kick-in, is when the pod failed
immediate := curPod.Status.Phase != v1.PodFailed immediate := curPod.Status.Phase != v1.PodFailed
// Don't check if oldPod has the finalizer, as during ownership transfer
// finalizers might be re-added and removed again in behalf of the new owner.
// If all those Pod updates collapse into a single event, the finalizer
// might be removed in oldPod and curPod. We want to record the latest
// state.
finalizerRemoved := !hasJobTrackingFinalizer(curPod)
curControllerRef := metav1.GetControllerOf(curPod) curControllerRef := metav1.GetControllerOf(curPod)
oldControllerRef := metav1.GetControllerOf(oldPod) oldControllerRef := metav1.GetControllerOf(oldPod)
controllerRefChanged := !reflect.DeepEqual(curControllerRef, oldControllerRef) controllerRefChanged := !reflect.DeepEqual(curControllerRef, oldControllerRef)
if controllerRefChanged && oldControllerRef != nil { if controllerRefChanged && oldControllerRef != nil {
// The ControllerRef was changed. Sync the old controller, if any. // The ControllerRef was changed. Sync the old controller, if any.
if job := jm.resolveControllerRef(oldPod.Namespace, oldControllerRef); job != nil { if job := jm.resolveControllerRef(oldPod.Namespace, oldControllerRef); job != nil {
if finalizerRemoved {
key, err := controller.KeyFunc(job)
if err == nil {
jm.finalizerExpectations.finalizerRemovalObserved(key, string(curPod.UID))
}
}
jm.enqueueController(job, immediate) jm.enqueueController(job, immediate)
} }
} }
@ -295,6 +313,12 @@ func (jm *Controller) updatePod(old, cur interface{}) {
if job == nil { if job == nil {
return return
} }
if finalizerRemoved {
key, err := controller.KeyFunc(job)
if err == nil {
jm.finalizerExpectations.finalizerRemovalObserved(key, string(curPod.UID))
}
}
jm.enqueueController(job, immediate) jm.enqueueController(job, immediate)
return return
} }
@ -311,7 +335,7 @@ func (jm *Controller) updatePod(old, cur interface{}) {
// When a pod is deleted, enqueue the job that manages the pod and update its expectations. // When a pod is deleted, enqueue the job that manages the pod and update its expectations.
// obj could be an *v1.Pod, or a DeleteFinalStateUnknown marker item. // obj could be an *v1.Pod, or a DeleteFinalStateUnknown marker item.
func (jm *Controller) deletePod(obj interface{}) { func (jm *Controller) deletePod(obj interface{}, final bool) {
pod, ok := obj.(*v1.Pod) pod, ok := obj.(*v1.Pod)
// When a delete is dropped, the relist will notice a pod in the store not // When a delete is dropped, the relist will notice a pod in the store not
@ -348,6 +372,13 @@ func (jm *Controller) deletePod(obj interface{}) {
return return
} }
jm.expectations.DeletionObserved(jobKey) jm.expectations.DeletionObserved(jobKey)
// Consider the finalizer removed if this is the final delete. Otherwise,
// it's an update for the deletion timestamp, then check finalizer.
if final || !hasJobTrackingFinalizer(pod) {
jm.finalizerExpectations.finalizerRemovalObserved(jobKey, string(pod.UID))
}
jm.enqueueController(job, true) jm.enqueueController(job, true)
} }
@ -572,6 +603,7 @@ func (jm *Controller) syncJob(key string) (forget bool, rErr error) {
if apierrors.IsNotFound(err) { if apierrors.IsNotFound(err) {
klog.V(4).Infof("Job has been deleted: %v", key) klog.V(4).Infof("Job has been deleted: %v", key)
jm.expectations.DeleteExpectations(key) jm.expectations.DeleteExpectations(key)
jm.finalizerExpectations.deleteExpectations(key)
return true, nil return true, nil
} }
return false, err return false, err
@ -610,6 +642,7 @@ func (jm *Controller) syncJob(key string) (forget bool, rErr error) {
metrics.JobSyncNum.WithLabelValues(completionMode, result, action).Inc() metrics.JobSyncNum.WithLabelValues(completionMode, result, action).Inc()
}() }()
var expectedRmFinalizers sets.String
var uncounted *uncountedTerminatedPods var uncounted *uncountedTerminatedPods
if trackingUncountedPods(&job) { if trackingUncountedPods(&job) {
klog.V(4).InfoS("Tracking uncounted Pods with pod finalizers", "job", klog.KObj(&job)) klog.V(4).InfoS("Tracking uncounted Pods with pod finalizers", "job", klog.KObj(&job))
@ -617,6 +650,7 @@ func (jm *Controller) syncJob(key string) (forget bool, rErr error) {
job.Status.UncountedTerminatedPods = &batch.UncountedTerminatedPods{} job.Status.UncountedTerminatedPods = &batch.UncountedTerminatedPods{}
} }
uncounted = newUncountedTerminatedPods(*job.Status.UncountedTerminatedPods) uncounted = newUncountedTerminatedPods(*job.Status.UncountedTerminatedPods)
expectedRmFinalizers = jm.finalizerExpectations.getExpectedUIDs(key)
} else if patch := removeTrackingAnnotationPatch(&job); patch != nil { } else if patch := removeTrackingAnnotationPatch(&job); patch != nil {
if err := jm.patchJobHandler(&job, patch); err != nil { if err := jm.patchJobHandler(&job, patch); err != nil {
return false, fmt.Errorf("removing tracking finalizer from job %s: %w", key, err) return false, fmt.Errorf("removing tracking finalizer from job %s: %w", key, err)
@ -635,7 +669,7 @@ func (jm *Controller) syncJob(key string) (forget bool, rErr error) {
activePods := controller.FilterActivePods(pods) activePods := controller.FilterActivePods(pods)
active := int32(len(activePods)) active := int32(len(activePods))
succeeded, failed := getStatus(&job, pods, uncounted) succeeded, failed := getStatus(&job, pods, uncounted, expectedRmFinalizers)
// Job first start. Set StartTime and start the ActiveDeadlineSeconds timer // Job first start. Set StartTime and start the ActiveDeadlineSeconds timer
// only if the job is not in the suspended state. // only if the job is not in the suspended state.
if job.Status.StartTime == nil && !jobSuspended(&job) { if job.Status.StartTime == nil && !jobSuspended(&job) {
@ -755,7 +789,7 @@ func (jm *Controller) syncJob(key string) (forget bool, rErr error) {
if uncounted != nil { if uncounted != nil {
needsStatusUpdate := suspendCondChanged || active != job.Status.Active needsStatusUpdate := suspendCondChanged || active != job.Status.Active
job.Status.Active = active job.Status.Active = active
err = jm.trackJobStatusAndRemoveFinalizers(&job, pods, prevSucceededIndexes, *uncounted, finishedCondition, needsStatusUpdate) err = jm.trackJobStatusAndRemoveFinalizers(&job, pods, prevSucceededIndexes, *uncounted, expectedRmFinalizers, finishedCondition, needsStatusUpdate)
if err != nil { if err != nil {
return false, fmt.Errorf("tracking status: %w", err) return false, fmt.Errorf("tracking status: %w", err)
} }
@ -873,7 +907,8 @@ func (jm *Controller) removeTrackingFinalizersFromAllPods(pods []*v1.Pod) error
if len(podsWithFinalizer) == 0 { if len(podsWithFinalizer) == 0 {
return nil return nil
} }
_, err := jm.removeTrackingFinalizerFromPods(podsWithFinalizer) // Tracking with finalizers is disabled, no need to set expectations.
_, err := jm.removeTrackingFinalizerFromPods("", podsWithFinalizer)
return err return err
} }
@ -883,9 +918,9 @@ func (jm *Controller) removeTrackingFinalizersFromAllPods(pods []*v1.Pod) error
// or the job was removed. // or the job was removed.
// 3. Increment job counters for pods that no longer have a finalizer. // 3. Increment job counters for pods that no longer have a finalizer.
// 4. Add Complete condition if satisfied with current counters. // 4. Add Complete condition if satisfied with current counters.
// It does this in a controlled way such that the size of .status doesn't grow // It does this up to a limited number of Pods so that the size of .status
// too much. // doesn't grow too much and this sync doesn't starve other Jobs.
func (jm *Controller) trackJobStatusAndRemoveFinalizers(job *batch.Job, pods []*v1.Pod, succeededIndexes orderedIntervals, uncounted uncountedTerminatedPods, finishedCond *batch.JobCondition, needsFlush bool) error { func (jm *Controller) trackJobStatusAndRemoveFinalizers(job *batch.Job, pods []*v1.Pod, succeededIndexes orderedIntervals, uncounted uncountedTerminatedPods, expectedRmFinalizers sets.String, finishedCond *batch.JobCondition, needsFlush bool) error {
isIndexed := isIndexedJob(job) isIndexed := isIndexedJob(job)
var podsToRemoveFinalizer []*v1.Pod var podsToRemoveFinalizer []*v1.Pod
uncountedStatus := job.Status.UncountedTerminatedPods uncountedStatus := job.Status.UncountedTerminatedPods
@ -896,15 +931,18 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(job *batch.Job, pods []*
} }
uidsWithFinalizer := make(sets.String, len(pods)) uidsWithFinalizer := make(sets.String, len(pods))
for _, p := range pods { for _, p := range pods {
if hasJobTrackingFinalizer(p) { uid := string(p.UID)
uidsWithFinalizer.Insert(string(p.UID)) if hasJobTrackingFinalizer(p) && !expectedRmFinalizers.Has(uid) {
uidsWithFinalizer.Insert(uid)
} }
} }
// Shallow copy, as it will only be used to detect changes in the counters.
oldCounters := job.Status
if cleanUncountedPodsWithoutFinalizers(&job.Status, uidsWithFinalizer) { if cleanUncountedPodsWithoutFinalizers(&job.Status, uidsWithFinalizer) {
needsFlush = true needsFlush = true
} }
for _, pod := range pods { for _, pod := range pods {
if !hasJobTrackingFinalizer(pod) { if !hasJobTrackingFinalizer(pod) || expectedRmFinalizers.Has(string(pod.UID)) {
continue continue
} }
podFinished := pod.Status.Phase == v1.PodSucceeded || pod.Status.Phase == v1.PodFailed podFinished := pod.Status.Phase == v1.PodSucceeded || pod.Status.Phase == v1.PodFailed
@ -936,18 +974,16 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(job *batch.Job, pods []*
uncountedStatus.Failed = append(uncountedStatus.Failed, pod.UID) uncountedStatus.Failed = append(uncountedStatus.Failed, pod.UID)
} }
} }
if len(newSucceededIndexes)+len(uncountedStatus.Succeeded)+len(uncountedStatus.Failed) >= maxUncountedPods { if len(newSucceededIndexes)+len(uncountedStatus.Succeeded)+len(uncountedStatus.Failed) >= MaxUncountedPods {
if len(newSucceededIndexes) > 0 { // The controller added enough Pods already to .status.uncountedTerminatedPods
succeededIndexes = succeededIndexes.withOrderedIndexes(newSucceededIndexes) // We stop counting pods and removing finalizers here to:
job.Status.Succeeded = int32(succeededIndexes.total()) // 1. Ensure that the UIDs representation are under 20 KB.
job.Status.CompletedIndexes = succeededIndexes.String() // 2. Cap the number of finalizer removals so that syncing of big Jobs
} // doesn't starve smaller ones.
var err error //
if job, needsFlush, err = jm.flushUncountedAndRemoveFinalizers(job, podsToRemoveFinalizer, uidsWithFinalizer, needsFlush); err != nil { // The job will be synced again because the Job status and Pod updates
return err // will put the Job back to the work queue.
} break
podsToRemoveFinalizer = nil
newSucceededIndexes = nil
} }
} }
if len(newSucceededIndexes) > 0 { if len(newSucceededIndexes) > 0 {
@ -956,7 +992,7 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(job *batch.Job, pods []*
job.Status.CompletedIndexes = succeededIndexes.String() job.Status.CompletedIndexes = succeededIndexes.String()
} }
var err error var err error
if job, needsFlush, err = jm.flushUncountedAndRemoveFinalizers(job, podsToRemoveFinalizer, uidsWithFinalizer, needsFlush); err != nil { if job, needsFlush, err = jm.flushUncountedAndRemoveFinalizers(job, podsToRemoveFinalizer, uidsWithFinalizer, &oldCounters, needsFlush); err != nil {
return err return err
} }
if jm.enactJobFinished(job, finishedCond) { if jm.enactJobFinished(job, finishedCond) {
@ -966,6 +1002,7 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(job *batch.Job, pods []*
if _, err := jm.updateStatusHandler(job); err != nil { if _, err := jm.updateStatusHandler(job); err != nil {
return fmt.Errorf("removing uncounted pods from status: %w", err) return fmt.Errorf("removing uncounted pods from status: %w", err)
} }
recordJobPodFinished(job, oldCounters)
} }
return nil return nil
} }
@ -979,18 +1016,25 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(job *batch.Job, pods []*
// 4. (if not all removals succeeded) flush Job status again. // 4. (if not all removals succeeded) flush Job status again.
// Returns whether there are pending changes in the Job status that need to be // Returns whether there are pending changes in the Job status that need to be
// flushed in subsequent calls. // flushed in subsequent calls.
func (jm *Controller) flushUncountedAndRemoveFinalizers(job *batch.Job, podsToRemoveFinalizer []*v1.Pod, uidsWithFinalizer sets.String, needsFlush bool) (*batch.Job, bool, error) { func (jm *Controller) flushUncountedAndRemoveFinalizers(job *batch.Job, podsToRemoveFinalizer []*v1.Pod, uidsWithFinalizer sets.String, oldCounters *batch.JobStatus, needsFlush bool) (*batch.Job, bool, error) {
var err error var err error
if needsFlush { if needsFlush {
if job, err = jm.updateStatusHandler(job); err != nil { if job, err = jm.updateStatusHandler(job); err != nil {
return job, needsFlush, fmt.Errorf("adding uncounted pods to status: %w", err) return job, needsFlush, fmt.Errorf("adding uncounted pods to status: %w", err)
} }
recordJobPodFinished(job, *oldCounters)
// Shallow copy, as it will only be used to detect changes in the counters.
*oldCounters = job.Status
needsFlush = false needsFlush = false
} }
jobKey, err := controller.KeyFunc(job)
if err != nil {
return job, needsFlush, fmt.Errorf("getting job key: %w", err)
}
var rmErr error var rmErr error
if len(podsToRemoveFinalizer) > 0 { if len(podsToRemoveFinalizer) > 0 {
var rmSucceded []bool var rmSucceded []bool
rmSucceded, rmErr = jm.removeTrackingFinalizerFromPods(podsToRemoveFinalizer) rmSucceded, rmErr = jm.removeTrackingFinalizerFromPods(jobKey, podsToRemoveFinalizer)
for i, p := range podsToRemoveFinalizer { for i, p := range podsToRemoveFinalizer {
if rmSucceded[i] { if rmSucceded[i] {
uidsWithFinalizer.Delete(string(p.UID)) uidsWithFinalizer.Delete(string(p.UID))
@ -1036,9 +1080,19 @@ func cleanUncountedPodsWithoutFinalizers(status *batch.JobStatus, uidsWithFinali
// returns an array of booleans where the i-th value is true if the finalizer // returns an array of booleans where the i-th value is true if the finalizer
// of the i-th Pod was successfully removed (if the pod was deleted when this // of the i-th Pod was successfully removed (if the pod was deleted when this
// function was called, it's considered as the finalizer was removed successfully). // function was called, it's considered as the finalizer was removed successfully).
func (jm *Controller) removeTrackingFinalizerFromPods(pods []*v1.Pod) ([]bool, error) { func (jm *Controller) removeTrackingFinalizerFromPods(jobKey string, pods []*v1.Pod) ([]bool, error) {
errCh := make(chan error, len(pods)) errCh := make(chan error, len(pods))
succeeded := make([]bool, len(pods)) succeeded := make([]bool, len(pods))
uids := make([]string, len(pods))
for i, p := range pods {
uids[i] = string(p.UID)
}
if jobKey != "" {
err := jm.finalizerExpectations.expectFinalizersRemoved(jobKey, uids)
if err != nil {
return succeeded, fmt.Errorf("setting expected removed finalizers: %w", err)
}
}
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
wg.Add(len(pods)) wg.Add(len(pods))
for i := range pods { for i := range pods {
@ -1046,10 +1100,17 @@ func (jm *Controller) removeTrackingFinalizerFromPods(pods []*v1.Pod) ([]bool, e
pod := pods[i] pod := pods[i]
defer wg.Done() defer wg.Done()
if patch := removeTrackingFinalizerPatch(pod); patch != nil { if patch := removeTrackingFinalizerPatch(pod); patch != nil {
if err := jm.podControl.PatchPod(pod.Namespace, pod.Name, patch); err != nil && !apierrors.IsNotFound(err) { if err := jm.podControl.PatchPod(pod.Namespace, pod.Name, patch); err != nil {
errCh <- err // In case of any failure, we don't expect a Pod update for the
utilruntime.HandleError(err) // finalizer removed. Clear expectation now.
return if jobKey != "" {
jm.finalizerExpectations.finalizerRemovalObserved(jobKey, string(pod.UID))
}
if !apierrors.IsNotFound(err) {
errCh <- err
utilruntime.HandleError(err)
return
}
} }
succeeded[i] = true succeeded[i] = true
} }
@ -1152,15 +1213,15 @@ func newCondition(conditionType batch.JobConditionType, status v1.ConditionStatu
} }
// getStatus returns number of succeeded and failed pods running a job // getStatus returns number of succeeded and failed pods running a job
func getStatus(job *batch.Job, pods []*v1.Pod, uncounted *uncountedTerminatedPods) (succeeded, failed int32) { func getStatus(job *batch.Job, pods []*v1.Pod, uncounted *uncountedTerminatedPods, expectedRmFinalizers sets.String) (succeeded, failed int32) {
if uncounted != nil { if uncounted != nil {
succeeded = job.Status.Succeeded succeeded = job.Status.Succeeded
failed = job.Status.Failed failed = job.Status.Failed
} }
succeeded += int32(countValidPodsWithFilter(job, pods, uncounted.Succeeded(), func(p *v1.Pod) bool { succeeded += int32(countValidPodsWithFilter(job, pods, uncounted.Succeeded(), expectedRmFinalizers, func(p *v1.Pod) bool {
return p.Status.Phase == v1.PodSucceeded return p.Status.Phase == v1.PodSucceeded
})) }))
failed += int32(countValidPodsWithFilter(job, pods, uncounted.Failed(), func(p *v1.Pod) bool { failed += int32(countValidPodsWithFilter(job, pods, uncounted.Failed(), expectedRmFinalizers, func(p *v1.Pod) bool {
if p.Status.Phase == v1.PodFailed { if p.Status.Phase == v1.PodFailed {
return true return true
} }
@ -1226,12 +1287,12 @@ func (jm *Controller) manageJob(job *batch.Job, activePods []*v1.Pod, succeeded
rmAtLeast = 0 rmAtLeast = 0
} }
podsToDelete := activePodsForRemoval(job, activePods, int(rmAtLeast)) podsToDelete := activePodsForRemoval(job, activePods, int(rmAtLeast))
if len(podsToDelete) > maxPodCreateDeletePerSync { if len(podsToDelete) > MaxPodCreateDeletePerSync {
podsToDelete = podsToDelete[:maxPodCreateDeletePerSync] podsToDelete = podsToDelete[:MaxPodCreateDeletePerSync]
} }
if len(podsToDelete) > 0 { if len(podsToDelete) > 0 {
jm.expectations.ExpectDeletions(jobKey, len(podsToDelete)) jm.expectations.ExpectDeletions(jobKey, len(podsToDelete))
klog.V(4).InfoS("Too many pods running for job", "job", klog.KObj(job), "deleted", len(podsToDelete), "target", parallelism) klog.V(4).InfoS("Too many pods running for job", "job", klog.KObj(job), "deleted", len(podsToDelete), "target", wantActive)
removed, err := jm.deleteJobPods(job, jobKey, podsToDelete) removed, err := jm.deleteJobPods(job, jobKey, podsToDelete)
active -= removed active -= removed
// While it is possible for a Job to require both pod creations and // While it is possible for a Job to require both pod creations and
@ -1243,8 +1304,8 @@ func (jm *Controller) manageJob(job *batch.Job, activePods []*v1.Pod, succeeded
if active < wantActive { if active < wantActive {
diff := wantActive - active diff := wantActive - active
if diff > int32(maxPodCreateDeletePerSync) { if diff > int32(MaxPodCreateDeletePerSync) {
diff = int32(maxPodCreateDeletePerSync) diff = int32(MaxPodCreateDeletePerSync)
} }
jm.expectations.ExpectCreations(jobKey, int(diff)) jm.expectations.ExpectCreations(jobKey, int(diff))
@ -1392,12 +1453,13 @@ func getBackoff(queue workqueue.RateLimitingInterface, key interface{}) time.Dur
// countValidPodsWithFilter returns number of valid pods that pass the filter. // countValidPodsWithFilter returns number of valid pods that pass the filter.
// Pods are valid if they have a finalizer and, for Indexed Jobs, a valid // Pods are valid if they have a finalizer and, for Indexed Jobs, a valid
// completion index. // completion index.
func countValidPodsWithFilter(job *batch.Job, pods []*v1.Pod, uncounted sets.String, filter func(*v1.Pod) bool) int { func countValidPodsWithFilter(job *batch.Job, pods []*v1.Pod, uncounted sets.String, expectedRmFinalizers sets.String, filter func(*v1.Pod) bool) int {
result := len(uncounted) result := len(uncounted)
for _, p := range pods { for _, p := range pods {
uid := string(p.UID)
// Pods that don't have a completion finalizer are in the uncounted set or // Pods that don't have a completion finalizer are in the uncounted set or
// have already been accounted for in the Job status. // have already been accounted for in the Job status.
if uncounted != nil && (!hasJobTrackingFinalizer(p) || uncounted.Has(string(p.UID))) { if uncounted != nil && (!hasJobTrackingFinalizer(p) || uncounted.Has(uid) || expectedRmFinalizers.Has(uid)) {
continue continue
} }
if isIndexedJob(job) { if isIndexedJob(job) {
@ -1538,3 +1600,11 @@ func ensureJobConditionStatus(list []batch.JobCondition, cType batch.JobConditio
} }
return list, false return list, false
} }
func recordJobPodFinished(job *batch.Job, oldCounters batch.JobStatus) {
completionMode := completionModeStr(job)
diff := job.Status.Succeeded - oldCounters.Succeeded
metrics.JobPodsFinished.WithLabelValues(completionMode, metrics.Succeeded).Add(float64(diff))
diff = job.Status.Failed - oldCounters.Failed
metrics.JobPodsFinished.WithLabelValues(completionMode, metrics.Failed).Add(float64(diff))
}

View File

@ -47,8 +47,10 @@ import (
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue" "k8s.io/client-go/util/workqueue"
featuregatetesting "k8s.io/component-base/featuregate/testing" featuregatetesting "k8s.io/component-base/featuregate/testing"
metricstestutil "k8s.io/component-base/metrics/testutil"
_ "k8s.io/kubernetes/pkg/apis/core/install" _ "k8s.io/kubernetes/pkg/apis/core/install"
"k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/job/metrics"
"k8s.io/kubernetes/pkg/controller/testutil" "k8s.io/kubernetes/pkg/controller/testutil"
"k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/features"
"k8s.io/utils/pointer" "k8s.io/utils/pointer"
@ -114,6 +116,7 @@ func newPod(name string, job *batch.Job) *v1.Pod {
return &v1.Pod{ return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: name, Name: name,
UID: types.UID(name),
Labels: job.Spec.Selector.MatchLabels, Labels: job.Spec.Selector.MatchLabels,
Namespace: job.Namespace, Namespace: job.Namespace,
OwnerReferences: []metav1.OwnerReference{*metav1.NewControllerRef(job, controllerKind)}, OwnerReferences: []metav1.OwnerReference{*metav1.NewControllerRef(job, controllerKind)},
@ -1001,10 +1004,11 @@ func TestSyncJobLegacyTracking(t *testing.T) {
func TestGetStatus(t *testing.T) { func TestGetStatus(t *testing.T) {
cases := map[string]struct { cases := map[string]struct {
job batch.Job job batch.Job
pods []*v1.Pod pods []*v1.Pod
wantSucceeded int32 expectedRmFinalizers sets.String
wantFailed int32 wantSucceeded int32
wantFailed int32
}{ }{
"without finalizers": { "without finalizers": {
job: batch.Job{ job: batch.Job{
@ -1066,6 +1070,30 @@ func TestGetStatus(t *testing.T) {
wantSucceeded: 4, wantSucceeded: 4,
wantFailed: 4, wantFailed: 4,
}, },
"with expected removed finalizers": {
job: batch.Job{
Status: batch.JobStatus{
Succeeded: 2,
Failed: 2,
UncountedTerminatedPods: &batch.UncountedTerminatedPods{
Succeeded: []types.UID{"a"},
Failed: []types.UID{"d"},
},
},
},
expectedRmFinalizers: sets.NewString("b", "f"),
pods: []*v1.Pod{
buildPod().uid("a").phase(v1.PodSucceeded).Pod,
buildPod().uid("b").phase(v1.PodSucceeded).trackingFinalizer().Pod,
buildPod().uid("c").phase(v1.PodSucceeded).trackingFinalizer().Pod,
buildPod().uid("d").phase(v1.PodFailed).Pod,
buildPod().uid("e").phase(v1.PodFailed).trackingFinalizer().Pod,
buildPod().uid("f").phase(v1.PodFailed).trackingFinalizer().Pod,
buildPod().uid("g").phase(v1.PodFailed).trackingFinalizer().Pod,
},
wantSucceeded: 4,
wantFailed: 5,
},
"deleted pods": { "deleted pods": {
pods: []*v1.Pod{ pods: []*v1.Pod{
buildPod().uid("a").phase(v1.PodSucceeded).deletionTimestamp().Pod, buildPod().uid("a").phase(v1.PodSucceeded).deletionTimestamp().Pod,
@ -1102,7 +1130,7 @@ func TestGetStatus(t *testing.T) {
if tc.job.Status.UncountedTerminatedPods != nil { if tc.job.Status.UncountedTerminatedPods != nil {
uncounted = newUncountedTerminatedPods(*tc.job.Status.UncountedTerminatedPods) uncounted = newUncountedTerminatedPods(*tc.job.Status.UncountedTerminatedPods)
} }
succeeded, failed := getStatus(&tc.job, tc.pods, uncounted) succeeded, failed := getStatus(&tc.job, tc.pods, uncounted, tc.expectedRmFinalizers)
if succeeded != tc.wantSucceeded { if succeeded != tc.wantSucceeded {
t.Errorf("getStatus reports %d succeeded pods, want %d", succeeded, tc.wantSucceeded) t.Errorf("getStatus reports %d succeeded pods, want %d", succeeded, tc.wantSucceeded)
} }
@ -1119,15 +1147,16 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) {
indexedCompletion := batch.IndexedCompletion indexedCompletion := batch.IndexedCompletion
mockErr := errors.New("mock error") mockErr := errors.New("mock error")
cases := map[string]struct { cases := map[string]struct {
job batch.Job job batch.Job
pods []*v1.Pod pods []*v1.Pod
finishedCond *batch.JobCondition finishedCond *batch.JobCondition
needsFlush bool expectedRmFinalizers sets.String
statusUpdateErr error needsFlush bool
podControlErr error statusUpdateErr error
wantErr error podControlErr error
wantRmFinalizers int wantErr error
wantStatusUpdates []batch.JobStatus wantRmFinalizers int
wantStatusUpdates []batch.JobStatus
}{ }{
"no updates": {}, "no updates": {},
"new active": { "new active": {
@ -1209,6 +1238,45 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) {
}, },
}, },
}, },
"expecting removed finalizers": {
job: batch.Job{
Status: batch.JobStatus{
Succeeded: 2,
Failed: 3,
UncountedTerminatedPods: &batch.UncountedTerminatedPods{
Succeeded: []types.UID{"a", "g"},
Failed: []types.UID{"b", "h"},
},
},
},
expectedRmFinalizers: sets.NewString("c", "d", "g", "h"),
pods: []*v1.Pod{
buildPod().uid("a").phase(v1.PodSucceeded).trackingFinalizer().Pod,
buildPod().uid("b").phase(v1.PodFailed).trackingFinalizer().Pod,
buildPod().uid("c").phase(v1.PodSucceeded).trackingFinalizer().Pod,
buildPod().uid("d").phase(v1.PodFailed).trackingFinalizer().Pod,
buildPod().uid("e").phase(v1.PodSucceeded).trackingFinalizer().Pod,
buildPod().uid("f").phase(v1.PodFailed).trackingFinalizer().Pod,
buildPod().uid("g").phase(v1.PodSucceeded).trackingFinalizer().Pod,
buildPod().uid("h").phase(v1.PodFailed).trackingFinalizer().Pod,
},
wantRmFinalizers: 4,
wantStatusUpdates: []batch.JobStatus{
{
UncountedTerminatedPods: &batch.UncountedTerminatedPods{
Succeeded: []types.UID{"a", "e"},
Failed: []types.UID{"b", "f"},
},
Succeeded: 3,
Failed: 4,
},
{
UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
Succeeded: 5,
Failed: 6,
},
},
},
"succeeding job": { "succeeding job": {
pods: []*v1.Pod{ pods: []*v1.Pod{
buildPod().uid("a").phase(v1.PodSucceeded).trackingFinalizer().Pod, buildPod().uid("a").phase(v1.PodSucceeded).trackingFinalizer().Pod,
@ -1462,7 +1530,7 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) {
pods = append(pods, buildPod().uid("b").phase(v1.PodFailed).trackingFinalizer().Pod) pods = append(pods, buildPod().uid("b").phase(v1.PodFailed).trackingFinalizer().Pod)
return pods return pods
}(), }(),
wantRmFinalizers: 501, wantRmFinalizers: 499,
wantStatusUpdates: []batch.JobStatus{ wantStatusUpdates: []batch.JobStatus{
{ {
UncountedTerminatedPods: &batch.UncountedTerminatedPods{ UncountedTerminatedPods: &batch.UncountedTerminatedPods{
@ -1479,17 +1547,11 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) {
}, },
{ {
UncountedTerminatedPods: &batch.UncountedTerminatedPods{ UncountedTerminatedPods: &batch.UncountedTerminatedPods{
Succeeded: []types.UID{"499"}, Failed: []types.UID{"b"},
Failed: []types.UID{"b"},
}, },
Succeeded: 499, Succeeded: 499,
Failed: 1, Failed: 1,
}, },
{
UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
Succeeded: 500,
Failed: 2,
},
}, },
}, },
"too many indexed finished": { "too many indexed finished": {
@ -1506,18 +1568,13 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) {
} }
return pods return pods
}(), }(),
wantRmFinalizers: 501, wantRmFinalizers: 500,
wantStatusUpdates: []batch.JobStatus{ wantStatusUpdates: []batch.JobStatus{
{ {
UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
CompletedIndexes: "0-499", CompletedIndexes: "0-499",
Succeeded: 500, Succeeded: 500,
}, },
{
CompletedIndexes: "0-500",
UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
Succeeded: 501,
},
}, },
}, },
} }
@ -1526,19 +1583,20 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) {
clientSet := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) clientSet := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
manager, _ := newControllerFromClient(clientSet, controller.NoResyncPeriodFunc) manager, _ := newControllerFromClient(clientSet, controller.NoResyncPeriodFunc)
fakePodControl := controller.FakePodControl{Err: tc.podControlErr} fakePodControl := controller.FakePodControl{Err: tc.podControlErr}
metrics.JobPodsFinished.Reset()
manager.podControl = &fakePodControl manager.podControl = &fakePodControl
var statusUpdates []batch.JobStatus var statusUpdates []batch.JobStatus
manager.updateStatusHandler = func(job *batch.Job) (*batch.Job, error) { manager.updateStatusHandler = func(job *batch.Job) (*batch.Job, error) {
statusUpdates = append(statusUpdates, *job.Status.DeepCopy()) statusUpdates = append(statusUpdates, *job.Status.DeepCopy())
return job, tc.statusUpdateErr return job, tc.statusUpdateErr
} }
job := tc.job.DeepCopy()
if tc.job.Status.UncountedTerminatedPods == nil { if job.Status.UncountedTerminatedPods == nil {
tc.job.Status.UncountedTerminatedPods = &batch.UncountedTerminatedPods{} job.Status.UncountedTerminatedPods = &batch.UncountedTerminatedPods{}
} }
uncounted := newUncountedTerminatedPods(*tc.job.Status.UncountedTerminatedPods) uncounted := newUncountedTerminatedPods(*job.Status.UncountedTerminatedPods)
succeededIndexes := succeededIndexesFromJob(&tc.job) succeededIndexes := succeededIndexesFromJob(job)
err := manager.trackJobStatusAndRemoveFinalizers(&tc.job, tc.pods, succeededIndexes, *uncounted, tc.finishedCond, tc.needsFlush) err := manager.trackJobStatusAndRemoveFinalizers(job, tc.pods, succeededIndexes, *uncounted, tc.expectedRmFinalizers, tc.finishedCond, tc.needsFlush)
if !errors.Is(err, tc.wantErr) { if !errors.Is(err, tc.wantErr) {
t.Errorf("Got error %v, want %v", err, tc.wantErr) t.Errorf("Got error %v, want %v", err, tc.wantErr)
} }
@ -1549,6 +1607,25 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) {
if rmFinalizers != tc.wantRmFinalizers { if rmFinalizers != tc.wantRmFinalizers {
t.Errorf("Removed %d finalizers, want %d", rmFinalizers, tc.wantRmFinalizers) t.Errorf("Removed %d finalizers, want %d", rmFinalizers, tc.wantRmFinalizers)
} }
if tc.wantErr == nil {
completionMode := completionModeStr(job)
v, err := metricstestutil.GetCounterMetricValue(metrics.JobPodsFinished.WithLabelValues(completionMode, metrics.Succeeded))
if err != nil {
t.Fatalf("Obtaining succeeded job_pods_finished_total: %v", err)
}
newSucceeded := job.Status.Succeeded - tc.job.Status.Succeeded
if float64(newSucceeded) != v {
t.Errorf("Metric reports %.0f succeeded pods, want %d", v, newSucceeded)
}
v, err = metricstestutil.GetCounterMetricValue(metrics.JobPodsFinished.WithLabelValues(completionMode, metrics.Failed))
if err != nil {
t.Fatalf("Obtaining failed job_pods_finished_total: %v", err)
}
newFailed := job.Status.Failed - tc.job.Status.Failed
if float64(newFailed) != v {
t.Errorf("Metric reports %.0f failed pods, want %d", v, newFailed)
}
}
}) })
} }
} }
@ -2274,7 +2351,7 @@ func TestDeletePod(t *testing.T) {
informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1) informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1)
informer.Core().V1().Pods().Informer().GetIndexer().Add(pod2) informer.Core().V1().Pods().Informer().GetIndexer().Add(pod2)
jm.deletePod(pod1) jm.deletePod(pod1, true)
if got, want := jm.queue.Len(), 1; got != want { if got, want := jm.queue.Len(), 1; got != want {
t.Fatalf("queue.Len() = %v, want %v", got, want) t.Fatalf("queue.Len() = %v, want %v", got, want)
} }
@ -2287,7 +2364,7 @@ func TestDeletePod(t *testing.T) {
t.Errorf("queue.Get() = %v, want %v", got, want) t.Errorf("queue.Get() = %v, want %v", got, want)
} }
jm.deletePod(pod2) jm.deletePod(pod2, true)
if got, want := jm.queue.Len(), 1; got != want { if got, want := jm.queue.Len(), 1; got != want {
t.Fatalf("queue.Len() = %v, want %v", got, want) t.Fatalf("queue.Len() = %v, want %v", got, want)
} }
@ -2322,7 +2399,7 @@ func TestDeletePodOrphan(t *testing.T) {
pod1.OwnerReferences = nil pod1.OwnerReferences = nil
informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1) informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1)
jm.deletePod(pod1) jm.deletePod(pod1, true)
if got, want := jm.queue.Len(), 0; got != want { if got, want := jm.queue.Len(), 0; got != want {
t.Fatalf("queue.Len() = %v, want %v", got, want) t.Fatalf("queue.Len() = %v, want %v", got, want)
} }
@ -2966,6 +3043,105 @@ func TestEnsureJobConditions(t *testing.T) {
} }
} }
func TestFinalizersRemovedExpectations(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, true)()
clientset := fake.NewSimpleClientset()
sharedInformers := informers.NewSharedInformerFactory(clientset, controller.NoResyncPeriodFunc())
manager := NewController(sharedInformers.Core().V1().Pods(), sharedInformers.Batch().V1().Jobs(), clientset)
manager.podStoreSynced = alwaysReady
manager.jobStoreSynced = alwaysReady
manager.podControl = &controller.FakePodControl{Err: errors.New("fake pod controller error")}
manager.updateStatusHandler = func(job *batch.Job) (*batch.Job, error) {
return job, nil
}
job := newJob(2, 2, 6, batch.NonIndexedCompletion)
job.Annotations = map[string]string{
batch.JobTrackingFinalizer: "",
}
sharedInformers.Batch().V1().Jobs().Informer().GetIndexer().Add(job)
pods := append(newPodList(2, v1.PodSucceeded, job), newPodList(2, v1.PodFailed, job)...)
podInformer := sharedInformers.Core().V1().Pods().Informer()
podIndexer := podInformer.GetIndexer()
uids := sets.NewString()
for i := range pods {
clientset.Tracker().Add(&pods[i])
podIndexer.Add(&pods[i])
uids.Insert(string(pods[i].UID))
}
jobKey := testutil.GetKey(job, t)
manager.syncJob(jobKey)
gotExpectedUIDs := manager.finalizerExpectations.getExpectedUIDs(jobKey)
if len(gotExpectedUIDs) != 0 {
t.Errorf("Got unwanted expectations for removed finalizers after first syncJob with client failures:\n%s", gotExpectedUIDs.List())
}
// Remove failures and re-sync.
manager.podControl.(*controller.FakePodControl).Err = nil
manager.syncJob(jobKey)
gotExpectedUIDs = manager.finalizerExpectations.getExpectedUIDs(jobKey)
if diff := cmp.Diff(uids, gotExpectedUIDs); diff != "" {
t.Errorf("Different expectations for removed finalizers after syncJob (-want,+got):\n%s", diff)
}
stopCh := make(chan struct{})
defer close(stopCh)
go sharedInformers.Core().V1().Pods().Informer().Run(stopCh)
cache.WaitForCacheSync(stopCh, podInformer.HasSynced)
// Make sure the first syncJob sets the expectations, even after the caches synced.
gotExpectedUIDs = manager.finalizerExpectations.getExpectedUIDs(jobKey)
if diff := cmp.Diff(uids, gotExpectedUIDs); diff != "" {
t.Errorf("Different expectations for removed finalizers after syncJob and cacheSync (-want,+got):\n%s", diff)
}
// Change pods in different ways.
podsResource := schema.GroupVersionResource{Version: "v1", Resource: "pods"}
update := pods[0].DeepCopy()
update.Finalizers = nil
update.ResourceVersion = "1"
err := clientset.Tracker().Update(podsResource, update, update.Namespace)
if err != nil {
t.Errorf("Removing finalizer: %v", err)
}
update = pods[1].DeepCopy()
update.Finalizers = nil
update.DeletionTimestamp = &metav1.Time{Time: time.Now()}
update.ResourceVersion = "1"
err = clientset.Tracker().Update(podsResource, update, update.Namespace)
if err != nil {
t.Errorf("Removing finalizer and setting deletion timestamp: %v", err)
}
// Preserve the finalizer.
update = pods[2].DeepCopy()
update.DeletionTimestamp = &metav1.Time{Time: time.Now()}
update.ResourceVersion = "1"
err = clientset.Tracker().Update(podsResource, update, update.Namespace)
if err != nil {
t.Errorf("Setting deletion timestamp: %v", err)
}
err = clientset.Tracker().Delete(podsResource, pods[3].Namespace, pods[3].Name)
if err != nil {
t.Errorf("Deleting pod that had finalizer: %v", err)
}
uids = sets.NewString(string(pods[2].UID))
var diff string
if err := wait.Poll(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) {
gotExpectedUIDs = manager.finalizerExpectations.getExpectedUIDs(jobKey)
diff = cmp.Diff(uids, gotExpectedUIDs)
return diff == "", nil
}); err != nil {
t.Errorf("Timeout waiting for expectations (-want, +got):\n%s", diff)
}
}
func checkJobCompletionEnvVariable(t *testing.T, spec *v1.PodSpec) { func checkJobCompletionEnvVariable(t *testing.T, spec *v1.PodSpec) {
t.Helper() t.Helper()
want := []v1.EnvVar{ want := []v1.EnvVar{

View File

@ -68,10 +68,26 @@ var (
}, },
[]string{"completion_mode", "result"}, []string{"completion_mode", "result"},
) )
// JobPodsFinished records the number of finished Pods that the job controller
// finished tracking.
// It only applies to Jobs that were created while the feature gate
// JobTrackingWithFinalizers was enabled.
// Possible label values:
// completion_mode: Indexed, NonIndexed
// result: failed, succeeded
JobPodsFinished = metrics.NewCounterVec(
&metrics.CounterOpts{
Subsystem: JobControllerSubsystem,
Name: "job_pods_finished_total",
Help: "The number of finished Pods that are fully tracked",
},
[]string{"completion_mode", "result"})
) )
// Possible values for the "action" label in the above metrics.
const ( const (
// Possible values for the "action" label in the above metrics.
// JobSyncActionReconciling when the Job's pod creation/deletion expectations // JobSyncActionReconciling when the Job's pod creation/deletion expectations
// are unsatisfied and the controller is waiting for issued Pod // are unsatisfied and the controller is waiting for issued Pod
// creation/deletions to complete. // creation/deletions to complete.
@ -88,6 +104,11 @@ const (
// if a Job is suspended or if the number of active Pods is more than // if a Job is suspended or if the number of active Pods is more than
// parallelism. // parallelism.
JobSyncActionPodsDeleted = "pods_deleted" JobSyncActionPodsDeleted = "pods_deleted"
// Possible values for "result" label in the above metrics.
Succeeded = "succeeded"
Failed = "failed"
) )
var registerMetrics sync.Once var registerMetrics sync.Once
@ -98,5 +119,6 @@ func Register() {
legacyregistry.MustRegister(JobSyncDurationSeconds) legacyregistry.MustRegister(JobSyncDurationSeconds)
legacyregistry.MustRegister(JobSyncNum) legacyregistry.MustRegister(JobSyncNum)
legacyregistry.MustRegister(JobFinishedNum) legacyregistry.MustRegister(JobFinishedNum)
legacyregistry.MustRegister(JobPodsFinished)
}) })
} }

View File

@ -0,0 +1,117 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package job
import (
"fmt"
"sync"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
)
// uidSetKeyFunc to parse out the key from a uidSet.
var uidSetKeyFunc = func(obj interface{}) (string, error) {
if u, ok := obj.(*uidSet); ok {
return u.key, nil
}
return "", fmt.Errorf("could not find key for obj %#v", obj)
}
// uidSet holds a key and a set of UIDs. Used by the
// uidTrackingExpectations to remember which UID it has seen/still waiting for.
type uidSet struct {
sync.RWMutex
set sets.String
key string
}
// uidTrackingExpectations tracks the UIDs of Pods the controller is waiting to
// observe tracking finalizer deletions.
type uidTrackingExpectations struct {
store cache.Store
}
// GetUIDs is a convenience method to avoid exposing the set of expected uids.
// The returned set is not thread safe, all modifications must be made holding
// the uidStoreLock.
func (u *uidTrackingExpectations) getSet(controllerKey string) *uidSet {
if obj, exists, err := u.store.GetByKey(controllerKey); err == nil && exists {
return obj.(*uidSet)
}
return nil
}
func (u *uidTrackingExpectations) getExpectedUIDs(controllerKey string) sets.String {
uids := u.getSet(controllerKey)
if uids == nil {
return nil
}
uids.RLock()
set := sets.NewString(uids.set.UnsortedList()...)
uids.RUnlock()
return set
}
// ExpectDeletions records expectations for the given deleteKeys, against the
// given job-key.
// This is thread-safe across different job keys.
func (u *uidTrackingExpectations) expectFinalizersRemoved(jobKey string, deletedKeys []string) error {
klog.V(4).InfoS("Expecting tracking finalizers removed", "job", jobKey, "podUIDs", deletedKeys)
uids := u.getSet(jobKey)
if uids == nil {
uids = &uidSet{
key: jobKey,
set: sets.NewString(),
}
if err := u.store.Add(uids); err != nil {
return err
}
}
uids.Lock()
uids.set.Insert(deletedKeys...)
uids.Unlock()
return nil
}
// FinalizerRemovalObserved records the given deleteKey as a deletion, for the given job.
func (u *uidTrackingExpectations) finalizerRemovalObserved(jobKey, deleteKey string) {
uids := u.getSet(jobKey)
if uids != nil {
uids.Lock()
if uids.set.Has(deleteKey) {
klog.V(4).InfoS("Observed tracking finalizer removed", "job", jobKey, "podUID", deleteKey)
uids.set.Delete(deleteKey)
}
uids.Unlock()
}
}
// DeleteExpectations deletes the UID set.
func (u *uidTrackingExpectations) deleteExpectations(jobKey string) {
if err := u.store.Delete(jobKey); err != nil {
klog.ErrorS(err, "deleting tracking annotation UID expectations", "job", jobKey)
}
}
// NewUIDTrackingControllerExpectations returns a wrapper around
// ControllerExpectations that is aware of deleteKeys.
func newUIDTrackingExpectations() *uidTrackingExpectations {
return &uidTrackingExpectations{store: cache.NewStore(uidSetKeyFunc)}
}

View File

@ -0,0 +1,111 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package job
import (
"sync"
"testing"
"github.com/google/go-cmp/cmp"
)
func TestUIDTrackingExpectations(t *testing.T) {
tracks := []struct {
job string
firstRound []string
secondRound []string
}{
{
job: "foo",
firstRound: []string{"a", "b", "c", "d"},
secondRound: []string{"e", "f"},
},
{
job: "bar",
firstRound: []string{"x", "y", "z"},
secondRound: []string{"u", "v", "w"},
},
{
job: "baz",
firstRound: []string{"w"},
secondRound: []string{"a"},
},
}
expectations := newUIDTrackingExpectations()
// Insert first round of keys in parallel.
var wg sync.WaitGroup
wg.Add(len(tracks))
errs := make([]error, len(tracks))
for i := range tracks {
track := tracks[i]
go func(errID int) {
errs[errID] = expectations.expectFinalizersRemoved(track.job, track.firstRound)
wg.Done()
}(i)
}
wg.Wait()
for i, err := range errs {
if err != nil {
t.Errorf("Failed adding first round of UIDs for job %s: %v", tracks[i].job, err)
}
}
for _, track := range tracks {
uids := expectations.getSet(track.job)
if uids == nil {
t.Errorf("Set of UIDs is empty for job %s", track.job)
} else if diff := cmp.Diff(track.firstRound, uids.set.List()); diff != "" {
t.Errorf("Unexpected keys for job %s (-want,+got):\n%s", track.job, diff)
}
}
// Delete the first round of keys and add the second round in parallel.
for i, track := range tracks {
wg.Add(len(track.firstRound) + 1)
track := track
for _, uid := range track.firstRound {
uid := uid
go func() {
expectations.finalizerRemovalObserved(track.job, uid)
wg.Done()
}()
}
go func(errID int) {
errs[errID] = expectations.expectFinalizersRemoved(track.job, track.secondRound)
wg.Done()
}(i)
}
wg.Wait()
for i, err := range errs {
if err != nil {
t.Errorf("Failed adding second round of UIDs for job %s: %v", tracks[i].job, err)
}
}
for _, track := range tracks {
uids := expectations.getSet(track.job)
if uids == nil {
t.Errorf("Set of UIDs is empty for job %s", track.job)
} else if diff := cmp.Diff(track.secondRound, uids.set.List()); diff != "" {
t.Errorf("Unexpected keys for job %s (-want,+got):\n%s", track.job, diff)
}
}
}

View File

@ -21,6 +21,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"strconv" "strconv"
"sync"
"testing" "testing"
"time" "time"
@ -220,6 +221,10 @@ func TestParallelJobParallelism(t *testing.T) {
} }
func TestParallelJobWithCompletions(t *testing.T) { func TestParallelJobWithCompletions(t *testing.T) {
// Lower limits for a job sync so that we can test partial updates with a low
// number of pods.
t.Cleanup(setDuringTest(&jobcontroller.MaxUncountedPods, 10))
t.Cleanup(setDuringTest(&jobcontroller.MaxPodCreateDeletePerSync, 10))
for _, wFinalizers := range []bool{false, true} { for _, wFinalizers := range []bool{false, true} {
t.Run(fmt.Sprintf("finalizers=%t", wFinalizers), func(t *testing.T) { t.Run(fmt.Sprintf("finalizers=%t", wFinalizers), func(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, wFinalizers)() defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, wFinalizers)()
@ -230,8 +235,8 @@ func TestParallelJobWithCompletions(t *testing.T) {
jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{ jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{
Spec: batchv1.JobSpec{ Spec: batchv1.JobSpec{
Parallelism: pointer.Int32Ptr(4), Parallelism: pointer.Int32Ptr(54),
Completions: pointer.Int32Ptr(6), Completions: pointer.Int32Ptr(56),
}, },
}) })
if err != nil { if err != nil {
@ -241,23 +246,23 @@ func TestParallelJobWithCompletions(t *testing.T) {
t.Errorf("apiserver created job with tracking annotation: %t, want %t", got, wFinalizers) t.Errorf("apiserver created job with tracking annotation: %t, want %t", got, wFinalizers)
} }
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
Active: 4, Active: 54,
}, wFinalizers) }, wFinalizers)
// Failed Pods are replaced. // Failed Pods are replaced.
if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 2); err != nil { if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 2); err != nil {
t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodFailed, err) t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodFailed, err)
} }
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
Active: 4, Active: 54,
Failed: 2, Failed: 2,
}, wFinalizers) }, wFinalizers)
// Pods are created until the number of succeeded Pods equals completions. // Pods are created until the number of succeeded Pods equals completions.
if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 3); err != nil { if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 53); err != nil {
t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodSucceeded, err) t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodSucceeded, err)
} }
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
Failed: 2, Failed: 2,
Succeeded: 3, Succeeded: 53,
Active: 3, Active: 3,
}, wFinalizers) }, wFinalizers)
// No more Pods are created after the Job completes. // No more Pods are created after the Job completes.
@ -267,7 +272,7 @@ func TestParallelJobWithCompletions(t *testing.T) {
validateJobSucceeded(ctx, t, clientSet, jobObj) validateJobSucceeded(ctx, t, clientSet, jobObj)
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
Failed: 2, Failed: 2,
Succeeded: 6, Succeeded: 56,
}, false) }, false)
validateFinishedPodsNoFinalizer(ctx, t, clientSet, jobObj) validateFinishedPodsNoFinalizer(ctx, t, clientSet, jobObj)
}) })
@ -781,22 +786,44 @@ func setJobPodsPhase(ctx context.Context, clientSet clientset.Interface, jobObj
if err != nil { if err != nil {
return fmt.Errorf("listing Job Pods: %w", err) return fmt.Errorf("listing Job Pods: %w", err)
} }
updates := make([]v1.Pod, 0, cnt)
for _, pod := range pods.Items { for _, pod := range pods.Items {
if cnt == 0 { if len(updates) == cnt {
break break
} }
if p := pod.Status.Phase; isPodOwnedByJob(&pod, jobObj) && p != v1.PodFailed && p != v1.PodSucceeded { if p := pod.Status.Phase; isPodOwnedByJob(&pod, jobObj) && p != v1.PodFailed && p != v1.PodSucceeded {
pod.Status.Phase = phase pod.Status.Phase = phase
_, err := clientSet.CoreV1().Pods(pod.Namespace).UpdateStatus(ctx, &pod, metav1.UpdateOptions{}) updates = append(updates, pod)
if err != nil {
return fmt.Errorf("updating Pod status: %w", err)
}
cnt--
} }
} }
if cnt != 0 { if len(updates) != cnt {
return fmt.Errorf("couldn't set phase on %d Job Pods", cnt) return fmt.Errorf("couldn't set phase on %d Job Pods", cnt)
} }
return updatePodStatuses(ctx, clientSet, updates)
}
func updatePodStatuses(ctx context.Context, clientSet clientset.Interface, updates []v1.Pod) error {
wg := sync.WaitGroup{}
wg.Add(len(updates))
errCh := make(chan error, len(updates))
for _, pod := range updates {
pod := pod
go func() {
_, err := clientSet.CoreV1().Pods(pod.Namespace).UpdateStatus(ctx, &pod, metav1.UpdateOptions{})
if err != nil {
errCh <- err
}
wg.Done()
}()
}
wg.Wait()
select {
case err := <-errCh:
return fmt.Errorf("updating Pod status: %w", err)
default:
}
return nil return nil
} }
@ -860,7 +887,11 @@ func setup(t *testing.T, nsBaseName string) (framework.CloseFunc, *restclient.Co
controlPlaneConfig := framework.NewIntegrationTestControlPlaneConfig() controlPlaneConfig := framework.NewIntegrationTestControlPlaneConfig()
_, server, apiServerCloseFn := framework.RunAnAPIServer(controlPlaneConfig) _, server, apiServerCloseFn := framework.RunAnAPIServer(controlPlaneConfig)
config := restclient.Config{Host: server.URL} config := restclient.Config{
Host: server.URL,
QPS: 200.0,
Burst: 200,
}
clientSet, err := clientset.NewForConfig(&config) clientSet, err := clientset.NewForConfig(&config)
if err != nil { if err != nil {
t.Fatalf("Error creating clientset: %v", err) t.Fatalf("Error creating clientset: %v", err)
@ -899,3 +930,11 @@ func hasJobTrackingAnnotation(job *batchv1.Job) bool {
_, ok := job.Annotations[batchv1.JobTrackingFinalizer] _, ok := job.Annotations[batchv1.JobTrackingFinalizer]
return ok return ok
} }
func setDuringTest(val *int, newVal int) func() {
origVal := *val
*val = newVal
return func() {
*val = origVal
}
}