diff --git a/pkg/controller/job/indexed_job_utils.go b/pkg/controller/job/indexed_job_utils.go index 7fa9bdc22b4..a6d953fa1c4 100644 --- a/pkg/controller/job/indexed_job_utils.go +++ b/pkg/controller/job/indexed_job_utils.go @@ -338,3 +338,10 @@ func (bci byCompletionIndex) Swap(i, j int) { func (bci byCompletionIndex) Len() int { return len(bci) } + +func completionModeStr(job *batch.Job) string { + if job.Spec.CompletionMode != nil { + return string(*job.Spec.CompletionMode) + } + return string(batch.NonIndexedCompletion) +} diff --git a/pkg/controller/job/job_controller.go b/pkg/controller/job/job_controller.go index 31d4727d897..9a54f45b1e9 100644 --- a/pkg/controller/job/job_controller.go +++ b/pkg/controller/job/job_controller.go @@ -55,22 +55,21 @@ import ( "k8s.io/utils/integer" ) -const ( - // maxUncountedPods is the maximum size the slices in - // .status.uncountedTerminatedPods should have to keep their representation - // roughly below 20 KB. - maxUncountedPods = 500 - maxPodCreateDeletePerSync = 500 -) - // controllerKind contains the schema.GroupVersionKind for this controller type. var controllerKind = batch.SchemeGroupVersion.WithKind("Job") var ( - // DefaultJobBackOff is the default backoff period, exported for the e2e test + // DefaultJobBackOff is the default backoff period. Exported for tests. DefaultJobBackOff = 10 * time.Second - // MaxJobBackOff is the max backoff period, exported for the e2e test + // MaxJobBackOff is the max backoff period. Exported for tests. MaxJobBackOff = 360 * time.Second + // MaxUncountedPods is the maximum size the slices in + // .status.uncountedTerminatedPods should have to keep their representation + // roughly below 20 KB. Exported for tests + MaxUncountedPods = 500 + // MaxPodCreateDeletePerSync is the maximum number of pods that can be + // created or deleted in a single sync call. Exported for tests. + MaxPodCreateDeletePerSync = 500 ) // Controller ensures that all Job objects have corresponding pods to @@ -94,6 +93,10 @@ type Controller struct { // A TTLCache of pod creates/deletes each rc expects to see expectations controller.ControllerExpectationsInterface + // finalizerExpectations tracks the Pod UIDs for which the controller + // expects to observe the tracking finalizer removed. + finalizerExpectations *uidTrackingExpectations + // A store of jobs jobLister batchv1listers.JobLister @@ -126,10 +129,11 @@ func NewController(podInformer coreinformers.PodInformer, jobInformer batchinfor KubeClient: kubeClient, Recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "job-controller"}), }, - expectations: controller.NewControllerExpectations(), - queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(DefaultJobBackOff, MaxJobBackOff), "job"), - orphanQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(DefaultJobBackOff, MaxJobBackOff), "job_orphan_pod"), - recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "job-controller"}), + expectations: controller.NewControllerExpectations(), + finalizerExpectations: newUIDTrackingExpectations(), + queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(DefaultJobBackOff, MaxJobBackOff), "job"), + orphanQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(DefaultJobBackOff, MaxJobBackOff), "job_orphan_pod"), + recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "job-controller"}), } jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ @@ -147,7 +151,9 @@ func NewController(podInformer coreinformers.PodInformer, jobInformer batchinfor podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: jm.addPod, UpdateFunc: jm.updatePod, - DeleteFunc: jm.deletePod, + DeleteFunc: func(obj interface{}) { + jm.deletePod(obj, true) + }, }) jm.podStore = podInformer.Lister() jm.podStoreSynced = podInformer.Informer().HasSynced @@ -228,7 +234,7 @@ func (jm *Controller) addPod(obj interface{}) { if pod.DeletionTimestamp != nil { // on a restart of the controller, it's possible a new pod shows up in a state that // is already pending deletion. Prevent the pod from being a creation observation. - jm.deletePod(pod) + jm.deletePod(pod, false) return } @@ -272,19 +278,31 @@ func (jm *Controller) updatePod(old, cur interface{}) { // and after such time has passed, the kubelet actually deletes it from the store. We receive an update // for modification of the deletion timestamp and expect an job to create more pods asap, not wait // until the kubelet actually deletes the pod. - jm.deletePod(curPod) + jm.deletePod(curPod, false) return } // the only time we want the backoff to kick-in, is when the pod failed immediate := curPod.Status.Phase != v1.PodFailed + // Don't check if oldPod has the finalizer, as during ownership transfer + // finalizers might be re-added and removed again in behalf of the new owner. + // If all those Pod updates collapse into a single event, the finalizer + // might be removed in oldPod and curPod. We want to record the latest + // state. + finalizerRemoved := !hasJobTrackingFinalizer(curPod) curControllerRef := metav1.GetControllerOf(curPod) oldControllerRef := metav1.GetControllerOf(oldPod) controllerRefChanged := !reflect.DeepEqual(curControllerRef, oldControllerRef) if controllerRefChanged && oldControllerRef != nil { // The ControllerRef was changed. Sync the old controller, if any. if job := jm.resolveControllerRef(oldPod.Namespace, oldControllerRef); job != nil { + if finalizerRemoved { + key, err := controller.KeyFunc(job) + if err == nil { + jm.finalizerExpectations.finalizerRemovalObserved(key, string(curPod.UID)) + } + } jm.enqueueController(job, immediate) } } @@ -295,6 +313,12 @@ func (jm *Controller) updatePod(old, cur interface{}) { if job == nil { return } + if finalizerRemoved { + key, err := controller.KeyFunc(job) + if err == nil { + jm.finalizerExpectations.finalizerRemovalObserved(key, string(curPod.UID)) + } + } jm.enqueueController(job, immediate) return } @@ -311,7 +335,7 @@ func (jm *Controller) updatePod(old, cur interface{}) { // When a pod is deleted, enqueue the job that manages the pod and update its expectations. // obj could be an *v1.Pod, or a DeleteFinalStateUnknown marker item. -func (jm *Controller) deletePod(obj interface{}) { +func (jm *Controller) deletePod(obj interface{}, final bool) { pod, ok := obj.(*v1.Pod) // When a delete is dropped, the relist will notice a pod in the store not @@ -348,6 +372,13 @@ func (jm *Controller) deletePod(obj interface{}) { return } jm.expectations.DeletionObserved(jobKey) + + // Consider the finalizer removed if this is the final delete. Otherwise, + // it's an update for the deletion timestamp, then check finalizer. + if final || !hasJobTrackingFinalizer(pod) { + jm.finalizerExpectations.finalizerRemovalObserved(jobKey, string(pod.UID)) + } + jm.enqueueController(job, true) } @@ -572,6 +603,7 @@ func (jm *Controller) syncJob(key string) (forget bool, rErr error) { if apierrors.IsNotFound(err) { klog.V(4).Infof("Job has been deleted: %v", key) jm.expectations.DeleteExpectations(key) + jm.finalizerExpectations.deleteExpectations(key) return true, nil } return false, err @@ -610,6 +642,7 @@ func (jm *Controller) syncJob(key string) (forget bool, rErr error) { metrics.JobSyncNum.WithLabelValues(completionMode, result, action).Inc() }() + var expectedRmFinalizers sets.String var uncounted *uncountedTerminatedPods if trackingUncountedPods(&job) { klog.V(4).InfoS("Tracking uncounted Pods with pod finalizers", "job", klog.KObj(&job)) @@ -617,6 +650,7 @@ func (jm *Controller) syncJob(key string) (forget bool, rErr error) { job.Status.UncountedTerminatedPods = &batch.UncountedTerminatedPods{} } uncounted = newUncountedTerminatedPods(*job.Status.UncountedTerminatedPods) + expectedRmFinalizers = jm.finalizerExpectations.getExpectedUIDs(key) } else if patch := removeTrackingAnnotationPatch(&job); patch != nil { if err := jm.patchJobHandler(&job, patch); err != nil { return false, fmt.Errorf("removing tracking finalizer from job %s: %w", key, err) @@ -635,7 +669,7 @@ func (jm *Controller) syncJob(key string) (forget bool, rErr error) { activePods := controller.FilterActivePods(pods) active := int32(len(activePods)) - succeeded, failed := getStatus(&job, pods, uncounted) + succeeded, failed := getStatus(&job, pods, uncounted, expectedRmFinalizers) // Job first start. Set StartTime and start the ActiveDeadlineSeconds timer // only if the job is not in the suspended state. if job.Status.StartTime == nil && !jobSuspended(&job) { @@ -755,7 +789,7 @@ func (jm *Controller) syncJob(key string) (forget bool, rErr error) { if uncounted != nil { needsStatusUpdate := suspendCondChanged || active != job.Status.Active job.Status.Active = active - err = jm.trackJobStatusAndRemoveFinalizers(&job, pods, prevSucceededIndexes, *uncounted, finishedCondition, needsStatusUpdate) + err = jm.trackJobStatusAndRemoveFinalizers(&job, pods, prevSucceededIndexes, *uncounted, expectedRmFinalizers, finishedCondition, needsStatusUpdate) if err != nil { return false, fmt.Errorf("tracking status: %w", err) } @@ -873,7 +907,8 @@ func (jm *Controller) removeTrackingFinalizersFromAllPods(pods []*v1.Pod) error if len(podsWithFinalizer) == 0 { return nil } - _, err := jm.removeTrackingFinalizerFromPods(podsWithFinalizer) + // Tracking with finalizers is disabled, no need to set expectations. + _, err := jm.removeTrackingFinalizerFromPods("", podsWithFinalizer) return err } @@ -883,9 +918,9 @@ func (jm *Controller) removeTrackingFinalizersFromAllPods(pods []*v1.Pod) error // or the job was removed. // 3. Increment job counters for pods that no longer have a finalizer. // 4. Add Complete condition if satisfied with current counters. -// It does this in a controlled way such that the size of .status doesn't grow -// too much. -func (jm *Controller) trackJobStatusAndRemoveFinalizers(job *batch.Job, pods []*v1.Pod, succeededIndexes orderedIntervals, uncounted uncountedTerminatedPods, finishedCond *batch.JobCondition, needsFlush bool) error { +// It does this up to a limited number of Pods so that the size of .status +// doesn't grow too much and this sync doesn't starve other Jobs. +func (jm *Controller) trackJobStatusAndRemoveFinalizers(job *batch.Job, pods []*v1.Pod, succeededIndexes orderedIntervals, uncounted uncountedTerminatedPods, expectedRmFinalizers sets.String, finishedCond *batch.JobCondition, needsFlush bool) error { isIndexed := isIndexedJob(job) var podsToRemoveFinalizer []*v1.Pod uncountedStatus := job.Status.UncountedTerminatedPods @@ -896,15 +931,18 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(job *batch.Job, pods []* } uidsWithFinalizer := make(sets.String, len(pods)) for _, p := range pods { - if hasJobTrackingFinalizer(p) { - uidsWithFinalizer.Insert(string(p.UID)) + uid := string(p.UID) + if hasJobTrackingFinalizer(p) && !expectedRmFinalizers.Has(uid) { + uidsWithFinalizer.Insert(uid) } } + // Shallow copy, as it will only be used to detect changes in the counters. + oldCounters := job.Status if cleanUncountedPodsWithoutFinalizers(&job.Status, uidsWithFinalizer) { needsFlush = true } for _, pod := range pods { - if !hasJobTrackingFinalizer(pod) { + if !hasJobTrackingFinalizer(pod) || expectedRmFinalizers.Has(string(pod.UID)) { continue } podFinished := pod.Status.Phase == v1.PodSucceeded || pod.Status.Phase == v1.PodFailed @@ -936,18 +974,16 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(job *batch.Job, pods []* uncountedStatus.Failed = append(uncountedStatus.Failed, pod.UID) } } - if len(newSucceededIndexes)+len(uncountedStatus.Succeeded)+len(uncountedStatus.Failed) >= maxUncountedPods { - if len(newSucceededIndexes) > 0 { - succeededIndexes = succeededIndexes.withOrderedIndexes(newSucceededIndexes) - job.Status.Succeeded = int32(succeededIndexes.total()) - job.Status.CompletedIndexes = succeededIndexes.String() - } - var err error - if job, needsFlush, err = jm.flushUncountedAndRemoveFinalizers(job, podsToRemoveFinalizer, uidsWithFinalizer, needsFlush); err != nil { - return err - } - podsToRemoveFinalizer = nil - newSucceededIndexes = nil + if len(newSucceededIndexes)+len(uncountedStatus.Succeeded)+len(uncountedStatus.Failed) >= MaxUncountedPods { + // The controller added enough Pods already to .status.uncountedTerminatedPods + // We stop counting pods and removing finalizers here to: + // 1. Ensure that the UIDs representation are under 20 KB. + // 2. Cap the number of finalizer removals so that syncing of big Jobs + // doesn't starve smaller ones. + // + // The job will be synced again because the Job status and Pod updates + // will put the Job back to the work queue. + break } } if len(newSucceededIndexes) > 0 { @@ -956,7 +992,7 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(job *batch.Job, pods []* job.Status.CompletedIndexes = succeededIndexes.String() } var err error - if job, needsFlush, err = jm.flushUncountedAndRemoveFinalizers(job, podsToRemoveFinalizer, uidsWithFinalizer, needsFlush); err != nil { + if job, needsFlush, err = jm.flushUncountedAndRemoveFinalizers(job, podsToRemoveFinalizer, uidsWithFinalizer, &oldCounters, needsFlush); err != nil { return err } if jm.enactJobFinished(job, finishedCond) { @@ -966,6 +1002,7 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(job *batch.Job, pods []* if _, err := jm.updateStatusHandler(job); err != nil { return fmt.Errorf("removing uncounted pods from status: %w", err) } + recordJobPodFinished(job, oldCounters) } return nil } @@ -979,18 +1016,25 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(job *batch.Job, pods []* // 4. (if not all removals succeeded) flush Job status again. // Returns whether there are pending changes in the Job status that need to be // flushed in subsequent calls. -func (jm *Controller) flushUncountedAndRemoveFinalizers(job *batch.Job, podsToRemoveFinalizer []*v1.Pod, uidsWithFinalizer sets.String, needsFlush bool) (*batch.Job, bool, error) { +func (jm *Controller) flushUncountedAndRemoveFinalizers(job *batch.Job, podsToRemoveFinalizer []*v1.Pod, uidsWithFinalizer sets.String, oldCounters *batch.JobStatus, needsFlush bool) (*batch.Job, bool, error) { var err error if needsFlush { if job, err = jm.updateStatusHandler(job); err != nil { return job, needsFlush, fmt.Errorf("adding uncounted pods to status: %w", err) } + recordJobPodFinished(job, *oldCounters) + // Shallow copy, as it will only be used to detect changes in the counters. + *oldCounters = job.Status needsFlush = false } + jobKey, err := controller.KeyFunc(job) + if err != nil { + return job, needsFlush, fmt.Errorf("getting job key: %w", err) + } var rmErr error if len(podsToRemoveFinalizer) > 0 { var rmSucceded []bool - rmSucceded, rmErr = jm.removeTrackingFinalizerFromPods(podsToRemoveFinalizer) + rmSucceded, rmErr = jm.removeTrackingFinalizerFromPods(jobKey, podsToRemoveFinalizer) for i, p := range podsToRemoveFinalizer { if rmSucceded[i] { uidsWithFinalizer.Delete(string(p.UID)) @@ -1036,9 +1080,19 @@ func cleanUncountedPodsWithoutFinalizers(status *batch.JobStatus, uidsWithFinali // returns an array of booleans where the i-th value is true if the finalizer // of the i-th Pod was successfully removed (if the pod was deleted when this // function was called, it's considered as the finalizer was removed successfully). -func (jm *Controller) removeTrackingFinalizerFromPods(pods []*v1.Pod) ([]bool, error) { +func (jm *Controller) removeTrackingFinalizerFromPods(jobKey string, pods []*v1.Pod) ([]bool, error) { errCh := make(chan error, len(pods)) succeeded := make([]bool, len(pods)) + uids := make([]string, len(pods)) + for i, p := range pods { + uids[i] = string(p.UID) + } + if jobKey != "" { + err := jm.finalizerExpectations.expectFinalizersRemoved(jobKey, uids) + if err != nil { + return succeeded, fmt.Errorf("setting expected removed finalizers: %w", err) + } + } wg := sync.WaitGroup{} wg.Add(len(pods)) for i := range pods { @@ -1046,10 +1100,17 @@ func (jm *Controller) removeTrackingFinalizerFromPods(pods []*v1.Pod) ([]bool, e pod := pods[i] defer wg.Done() if patch := removeTrackingFinalizerPatch(pod); patch != nil { - if err := jm.podControl.PatchPod(pod.Namespace, pod.Name, patch); err != nil && !apierrors.IsNotFound(err) { - errCh <- err - utilruntime.HandleError(err) - return + if err := jm.podControl.PatchPod(pod.Namespace, pod.Name, patch); err != nil { + // In case of any failure, we don't expect a Pod update for the + // finalizer removed. Clear expectation now. + if jobKey != "" { + jm.finalizerExpectations.finalizerRemovalObserved(jobKey, string(pod.UID)) + } + if !apierrors.IsNotFound(err) { + errCh <- err + utilruntime.HandleError(err) + return + } } succeeded[i] = true } @@ -1152,15 +1213,15 @@ func newCondition(conditionType batch.JobConditionType, status v1.ConditionStatu } // getStatus returns number of succeeded and failed pods running a job -func getStatus(job *batch.Job, pods []*v1.Pod, uncounted *uncountedTerminatedPods) (succeeded, failed int32) { +func getStatus(job *batch.Job, pods []*v1.Pod, uncounted *uncountedTerminatedPods, expectedRmFinalizers sets.String) (succeeded, failed int32) { if uncounted != nil { succeeded = job.Status.Succeeded failed = job.Status.Failed } - succeeded += int32(countValidPodsWithFilter(job, pods, uncounted.Succeeded(), func(p *v1.Pod) bool { + succeeded += int32(countValidPodsWithFilter(job, pods, uncounted.Succeeded(), expectedRmFinalizers, func(p *v1.Pod) bool { return p.Status.Phase == v1.PodSucceeded })) - failed += int32(countValidPodsWithFilter(job, pods, uncounted.Failed(), func(p *v1.Pod) bool { + failed += int32(countValidPodsWithFilter(job, pods, uncounted.Failed(), expectedRmFinalizers, func(p *v1.Pod) bool { if p.Status.Phase == v1.PodFailed { return true } @@ -1226,12 +1287,12 @@ func (jm *Controller) manageJob(job *batch.Job, activePods []*v1.Pod, succeeded rmAtLeast = 0 } podsToDelete := activePodsForRemoval(job, activePods, int(rmAtLeast)) - if len(podsToDelete) > maxPodCreateDeletePerSync { - podsToDelete = podsToDelete[:maxPodCreateDeletePerSync] + if len(podsToDelete) > MaxPodCreateDeletePerSync { + podsToDelete = podsToDelete[:MaxPodCreateDeletePerSync] } if len(podsToDelete) > 0 { jm.expectations.ExpectDeletions(jobKey, len(podsToDelete)) - klog.V(4).InfoS("Too many pods running for job", "job", klog.KObj(job), "deleted", len(podsToDelete), "target", parallelism) + klog.V(4).InfoS("Too many pods running for job", "job", klog.KObj(job), "deleted", len(podsToDelete), "target", wantActive) removed, err := jm.deleteJobPods(job, jobKey, podsToDelete) active -= removed // While it is possible for a Job to require both pod creations and @@ -1243,8 +1304,8 @@ func (jm *Controller) manageJob(job *batch.Job, activePods []*v1.Pod, succeeded if active < wantActive { diff := wantActive - active - if diff > int32(maxPodCreateDeletePerSync) { - diff = int32(maxPodCreateDeletePerSync) + if diff > int32(MaxPodCreateDeletePerSync) { + diff = int32(MaxPodCreateDeletePerSync) } jm.expectations.ExpectCreations(jobKey, int(diff)) @@ -1392,12 +1453,13 @@ func getBackoff(queue workqueue.RateLimitingInterface, key interface{}) time.Dur // countValidPodsWithFilter returns number of valid pods that pass the filter. // Pods are valid if they have a finalizer and, for Indexed Jobs, a valid // completion index. -func countValidPodsWithFilter(job *batch.Job, pods []*v1.Pod, uncounted sets.String, filter func(*v1.Pod) bool) int { +func countValidPodsWithFilter(job *batch.Job, pods []*v1.Pod, uncounted sets.String, expectedRmFinalizers sets.String, filter func(*v1.Pod) bool) int { result := len(uncounted) for _, p := range pods { + uid := string(p.UID) // Pods that don't have a completion finalizer are in the uncounted set or // have already been accounted for in the Job status. - if uncounted != nil && (!hasJobTrackingFinalizer(p) || uncounted.Has(string(p.UID))) { + if uncounted != nil && (!hasJobTrackingFinalizer(p) || uncounted.Has(uid) || expectedRmFinalizers.Has(uid)) { continue } if isIndexedJob(job) { @@ -1538,3 +1600,11 @@ func ensureJobConditionStatus(list []batch.JobCondition, cType batch.JobConditio } return list, false } + +func recordJobPodFinished(job *batch.Job, oldCounters batch.JobStatus) { + completionMode := completionModeStr(job) + diff := job.Status.Succeeded - oldCounters.Succeeded + metrics.JobPodsFinished.WithLabelValues(completionMode, metrics.Succeeded).Add(float64(diff)) + diff = job.Status.Failed - oldCounters.Failed + metrics.JobPodsFinished.WithLabelValues(completionMode, metrics.Failed).Add(float64(diff)) +} diff --git a/pkg/controller/job/job_controller_test.go b/pkg/controller/job/job_controller_test.go index de9da1548b3..3631c1dae3b 100644 --- a/pkg/controller/job/job_controller_test.go +++ b/pkg/controller/job/job_controller_test.go @@ -47,8 +47,10 @@ import ( "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" featuregatetesting "k8s.io/component-base/featuregate/testing" + metricstestutil "k8s.io/component-base/metrics/testutil" _ "k8s.io/kubernetes/pkg/apis/core/install" "k8s.io/kubernetes/pkg/controller" + "k8s.io/kubernetes/pkg/controller/job/metrics" "k8s.io/kubernetes/pkg/controller/testutil" "k8s.io/kubernetes/pkg/features" "k8s.io/utils/pointer" @@ -114,6 +116,7 @@ func newPod(name string, job *batch.Job) *v1.Pod { return &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: name, + UID: types.UID(name), Labels: job.Spec.Selector.MatchLabels, Namespace: job.Namespace, OwnerReferences: []metav1.OwnerReference{*metav1.NewControllerRef(job, controllerKind)}, @@ -1001,10 +1004,11 @@ func TestSyncJobLegacyTracking(t *testing.T) { func TestGetStatus(t *testing.T) { cases := map[string]struct { - job batch.Job - pods []*v1.Pod - wantSucceeded int32 - wantFailed int32 + job batch.Job + pods []*v1.Pod + expectedRmFinalizers sets.String + wantSucceeded int32 + wantFailed int32 }{ "without finalizers": { job: batch.Job{ @@ -1066,6 +1070,30 @@ func TestGetStatus(t *testing.T) { wantSucceeded: 4, wantFailed: 4, }, + "with expected removed finalizers": { + job: batch.Job{ + Status: batch.JobStatus{ + Succeeded: 2, + Failed: 2, + UncountedTerminatedPods: &batch.UncountedTerminatedPods{ + Succeeded: []types.UID{"a"}, + Failed: []types.UID{"d"}, + }, + }, + }, + expectedRmFinalizers: sets.NewString("b", "f"), + pods: []*v1.Pod{ + buildPod().uid("a").phase(v1.PodSucceeded).Pod, + buildPod().uid("b").phase(v1.PodSucceeded).trackingFinalizer().Pod, + buildPod().uid("c").phase(v1.PodSucceeded).trackingFinalizer().Pod, + buildPod().uid("d").phase(v1.PodFailed).Pod, + buildPod().uid("e").phase(v1.PodFailed).trackingFinalizer().Pod, + buildPod().uid("f").phase(v1.PodFailed).trackingFinalizer().Pod, + buildPod().uid("g").phase(v1.PodFailed).trackingFinalizer().Pod, + }, + wantSucceeded: 4, + wantFailed: 5, + }, "deleted pods": { pods: []*v1.Pod{ buildPod().uid("a").phase(v1.PodSucceeded).deletionTimestamp().Pod, @@ -1102,7 +1130,7 @@ func TestGetStatus(t *testing.T) { if tc.job.Status.UncountedTerminatedPods != nil { uncounted = newUncountedTerminatedPods(*tc.job.Status.UncountedTerminatedPods) } - succeeded, failed := getStatus(&tc.job, tc.pods, uncounted) + succeeded, failed := getStatus(&tc.job, tc.pods, uncounted, tc.expectedRmFinalizers) if succeeded != tc.wantSucceeded { t.Errorf("getStatus reports %d succeeded pods, want %d", succeeded, tc.wantSucceeded) } @@ -1119,15 +1147,16 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) { indexedCompletion := batch.IndexedCompletion mockErr := errors.New("mock error") cases := map[string]struct { - job batch.Job - pods []*v1.Pod - finishedCond *batch.JobCondition - needsFlush bool - statusUpdateErr error - podControlErr error - wantErr error - wantRmFinalizers int - wantStatusUpdates []batch.JobStatus + job batch.Job + pods []*v1.Pod + finishedCond *batch.JobCondition + expectedRmFinalizers sets.String + needsFlush bool + statusUpdateErr error + podControlErr error + wantErr error + wantRmFinalizers int + wantStatusUpdates []batch.JobStatus }{ "no updates": {}, "new active": { @@ -1209,6 +1238,45 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) { }, }, }, + "expecting removed finalizers": { + job: batch.Job{ + Status: batch.JobStatus{ + Succeeded: 2, + Failed: 3, + UncountedTerminatedPods: &batch.UncountedTerminatedPods{ + Succeeded: []types.UID{"a", "g"}, + Failed: []types.UID{"b", "h"}, + }, + }, + }, + expectedRmFinalizers: sets.NewString("c", "d", "g", "h"), + pods: []*v1.Pod{ + buildPod().uid("a").phase(v1.PodSucceeded).trackingFinalizer().Pod, + buildPod().uid("b").phase(v1.PodFailed).trackingFinalizer().Pod, + buildPod().uid("c").phase(v1.PodSucceeded).trackingFinalizer().Pod, + buildPod().uid("d").phase(v1.PodFailed).trackingFinalizer().Pod, + buildPod().uid("e").phase(v1.PodSucceeded).trackingFinalizer().Pod, + buildPod().uid("f").phase(v1.PodFailed).trackingFinalizer().Pod, + buildPod().uid("g").phase(v1.PodSucceeded).trackingFinalizer().Pod, + buildPod().uid("h").phase(v1.PodFailed).trackingFinalizer().Pod, + }, + wantRmFinalizers: 4, + wantStatusUpdates: []batch.JobStatus{ + { + UncountedTerminatedPods: &batch.UncountedTerminatedPods{ + Succeeded: []types.UID{"a", "e"}, + Failed: []types.UID{"b", "f"}, + }, + Succeeded: 3, + Failed: 4, + }, + { + UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, + Succeeded: 5, + Failed: 6, + }, + }, + }, "succeeding job": { pods: []*v1.Pod{ buildPod().uid("a").phase(v1.PodSucceeded).trackingFinalizer().Pod, @@ -1462,7 +1530,7 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) { pods = append(pods, buildPod().uid("b").phase(v1.PodFailed).trackingFinalizer().Pod) return pods }(), - wantRmFinalizers: 501, + wantRmFinalizers: 499, wantStatusUpdates: []batch.JobStatus{ { UncountedTerminatedPods: &batch.UncountedTerminatedPods{ @@ -1479,17 +1547,11 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) { }, { UncountedTerminatedPods: &batch.UncountedTerminatedPods{ - Succeeded: []types.UID{"499"}, - Failed: []types.UID{"b"}, + Failed: []types.UID{"b"}, }, Succeeded: 499, Failed: 1, }, - { - UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, - Succeeded: 500, - Failed: 2, - }, }, }, "too many indexed finished": { @@ -1506,18 +1568,13 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) { } return pods }(), - wantRmFinalizers: 501, + wantRmFinalizers: 500, wantStatusUpdates: []batch.JobStatus{ { UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, CompletedIndexes: "0-499", Succeeded: 500, }, - { - CompletedIndexes: "0-500", - UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, - Succeeded: 501, - }, }, }, } @@ -1526,19 +1583,20 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) { clientSet := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) manager, _ := newControllerFromClient(clientSet, controller.NoResyncPeriodFunc) fakePodControl := controller.FakePodControl{Err: tc.podControlErr} + metrics.JobPodsFinished.Reset() manager.podControl = &fakePodControl var statusUpdates []batch.JobStatus manager.updateStatusHandler = func(job *batch.Job) (*batch.Job, error) { statusUpdates = append(statusUpdates, *job.Status.DeepCopy()) return job, tc.statusUpdateErr } - - if tc.job.Status.UncountedTerminatedPods == nil { - tc.job.Status.UncountedTerminatedPods = &batch.UncountedTerminatedPods{} + job := tc.job.DeepCopy() + if job.Status.UncountedTerminatedPods == nil { + job.Status.UncountedTerminatedPods = &batch.UncountedTerminatedPods{} } - uncounted := newUncountedTerminatedPods(*tc.job.Status.UncountedTerminatedPods) - succeededIndexes := succeededIndexesFromJob(&tc.job) - err := manager.trackJobStatusAndRemoveFinalizers(&tc.job, tc.pods, succeededIndexes, *uncounted, tc.finishedCond, tc.needsFlush) + uncounted := newUncountedTerminatedPods(*job.Status.UncountedTerminatedPods) + succeededIndexes := succeededIndexesFromJob(job) + err := manager.trackJobStatusAndRemoveFinalizers(job, tc.pods, succeededIndexes, *uncounted, tc.expectedRmFinalizers, tc.finishedCond, tc.needsFlush) if !errors.Is(err, tc.wantErr) { t.Errorf("Got error %v, want %v", err, tc.wantErr) } @@ -1549,6 +1607,25 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) { if rmFinalizers != tc.wantRmFinalizers { t.Errorf("Removed %d finalizers, want %d", rmFinalizers, tc.wantRmFinalizers) } + if tc.wantErr == nil { + completionMode := completionModeStr(job) + v, err := metricstestutil.GetCounterMetricValue(metrics.JobPodsFinished.WithLabelValues(completionMode, metrics.Succeeded)) + if err != nil { + t.Fatalf("Obtaining succeeded job_pods_finished_total: %v", err) + } + newSucceeded := job.Status.Succeeded - tc.job.Status.Succeeded + if float64(newSucceeded) != v { + t.Errorf("Metric reports %.0f succeeded pods, want %d", v, newSucceeded) + } + v, err = metricstestutil.GetCounterMetricValue(metrics.JobPodsFinished.WithLabelValues(completionMode, metrics.Failed)) + if err != nil { + t.Fatalf("Obtaining failed job_pods_finished_total: %v", err) + } + newFailed := job.Status.Failed - tc.job.Status.Failed + if float64(newFailed) != v { + t.Errorf("Metric reports %.0f failed pods, want %d", v, newFailed) + } + } }) } } @@ -2274,7 +2351,7 @@ func TestDeletePod(t *testing.T) { informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1) informer.Core().V1().Pods().Informer().GetIndexer().Add(pod2) - jm.deletePod(pod1) + jm.deletePod(pod1, true) if got, want := jm.queue.Len(), 1; got != want { t.Fatalf("queue.Len() = %v, want %v", got, want) } @@ -2287,7 +2364,7 @@ func TestDeletePod(t *testing.T) { t.Errorf("queue.Get() = %v, want %v", got, want) } - jm.deletePod(pod2) + jm.deletePod(pod2, true) if got, want := jm.queue.Len(), 1; got != want { t.Fatalf("queue.Len() = %v, want %v", got, want) } @@ -2322,7 +2399,7 @@ func TestDeletePodOrphan(t *testing.T) { pod1.OwnerReferences = nil informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1) - jm.deletePod(pod1) + jm.deletePod(pod1, true) if got, want := jm.queue.Len(), 0; got != want { t.Fatalf("queue.Len() = %v, want %v", got, want) } @@ -2966,6 +3043,105 @@ func TestEnsureJobConditions(t *testing.T) { } } +func TestFinalizersRemovedExpectations(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, true)() + clientset := fake.NewSimpleClientset() + sharedInformers := informers.NewSharedInformerFactory(clientset, controller.NoResyncPeriodFunc()) + manager := NewController(sharedInformers.Core().V1().Pods(), sharedInformers.Batch().V1().Jobs(), clientset) + manager.podStoreSynced = alwaysReady + manager.jobStoreSynced = alwaysReady + manager.podControl = &controller.FakePodControl{Err: errors.New("fake pod controller error")} + manager.updateStatusHandler = func(job *batch.Job) (*batch.Job, error) { + return job, nil + } + + job := newJob(2, 2, 6, batch.NonIndexedCompletion) + job.Annotations = map[string]string{ + batch.JobTrackingFinalizer: "", + } + sharedInformers.Batch().V1().Jobs().Informer().GetIndexer().Add(job) + pods := append(newPodList(2, v1.PodSucceeded, job), newPodList(2, v1.PodFailed, job)...) + podInformer := sharedInformers.Core().V1().Pods().Informer() + podIndexer := podInformer.GetIndexer() + uids := sets.NewString() + for i := range pods { + clientset.Tracker().Add(&pods[i]) + podIndexer.Add(&pods[i]) + uids.Insert(string(pods[i].UID)) + } + jobKey := testutil.GetKey(job, t) + + manager.syncJob(jobKey) + gotExpectedUIDs := manager.finalizerExpectations.getExpectedUIDs(jobKey) + if len(gotExpectedUIDs) != 0 { + t.Errorf("Got unwanted expectations for removed finalizers after first syncJob with client failures:\n%s", gotExpectedUIDs.List()) + } + + // Remove failures and re-sync. + manager.podControl.(*controller.FakePodControl).Err = nil + manager.syncJob(jobKey) + gotExpectedUIDs = manager.finalizerExpectations.getExpectedUIDs(jobKey) + if diff := cmp.Diff(uids, gotExpectedUIDs); diff != "" { + t.Errorf("Different expectations for removed finalizers after syncJob (-want,+got):\n%s", diff) + } + + stopCh := make(chan struct{}) + defer close(stopCh) + go sharedInformers.Core().V1().Pods().Informer().Run(stopCh) + cache.WaitForCacheSync(stopCh, podInformer.HasSynced) + + // Make sure the first syncJob sets the expectations, even after the caches synced. + gotExpectedUIDs = manager.finalizerExpectations.getExpectedUIDs(jobKey) + if diff := cmp.Diff(uids, gotExpectedUIDs); diff != "" { + t.Errorf("Different expectations for removed finalizers after syncJob and cacheSync (-want,+got):\n%s", diff) + } + + // Change pods in different ways. + + podsResource := schema.GroupVersionResource{Version: "v1", Resource: "pods"} + + update := pods[0].DeepCopy() + update.Finalizers = nil + update.ResourceVersion = "1" + err := clientset.Tracker().Update(podsResource, update, update.Namespace) + if err != nil { + t.Errorf("Removing finalizer: %v", err) + } + + update = pods[1].DeepCopy() + update.Finalizers = nil + update.DeletionTimestamp = &metav1.Time{Time: time.Now()} + update.ResourceVersion = "1" + err = clientset.Tracker().Update(podsResource, update, update.Namespace) + if err != nil { + t.Errorf("Removing finalizer and setting deletion timestamp: %v", err) + } + + // Preserve the finalizer. + update = pods[2].DeepCopy() + update.DeletionTimestamp = &metav1.Time{Time: time.Now()} + update.ResourceVersion = "1" + err = clientset.Tracker().Update(podsResource, update, update.Namespace) + if err != nil { + t.Errorf("Setting deletion timestamp: %v", err) + } + + err = clientset.Tracker().Delete(podsResource, pods[3].Namespace, pods[3].Name) + if err != nil { + t.Errorf("Deleting pod that had finalizer: %v", err) + } + + uids = sets.NewString(string(pods[2].UID)) + var diff string + if err := wait.Poll(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) { + gotExpectedUIDs = manager.finalizerExpectations.getExpectedUIDs(jobKey) + diff = cmp.Diff(uids, gotExpectedUIDs) + return diff == "", nil + }); err != nil { + t.Errorf("Timeout waiting for expectations (-want, +got):\n%s", diff) + } +} + func checkJobCompletionEnvVariable(t *testing.T, spec *v1.PodSpec) { t.Helper() want := []v1.EnvVar{ diff --git a/pkg/controller/job/metrics/metrics.go b/pkg/controller/job/metrics/metrics.go index b2517a09f42..b9271426661 100644 --- a/pkg/controller/job/metrics/metrics.go +++ b/pkg/controller/job/metrics/metrics.go @@ -68,10 +68,26 @@ var ( }, []string{"completion_mode", "result"}, ) + + // JobPodsFinished records the number of finished Pods that the job controller + // finished tracking. + // It only applies to Jobs that were created while the feature gate + // JobTrackingWithFinalizers was enabled. + // Possible label values: + // completion_mode: Indexed, NonIndexed + // result: failed, succeeded + JobPodsFinished = metrics.NewCounterVec( + &metrics.CounterOpts{ + Subsystem: JobControllerSubsystem, + Name: "job_pods_finished_total", + Help: "The number of finished Pods that are fully tracked", + }, + []string{"completion_mode", "result"}) ) -// Possible values for the "action" label in the above metrics. const ( + // Possible values for the "action" label in the above metrics. + // JobSyncActionReconciling when the Job's pod creation/deletion expectations // are unsatisfied and the controller is waiting for issued Pod // creation/deletions to complete. @@ -88,6 +104,11 @@ const ( // if a Job is suspended or if the number of active Pods is more than // parallelism. JobSyncActionPodsDeleted = "pods_deleted" + + // Possible values for "result" label in the above metrics. + + Succeeded = "succeeded" + Failed = "failed" ) var registerMetrics sync.Once @@ -98,5 +119,6 @@ func Register() { legacyregistry.MustRegister(JobSyncDurationSeconds) legacyregistry.MustRegister(JobSyncNum) legacyregistry.MustRegister(JobFinishedNum) + legacyregistry.MustRegister(JobPodsFinished) }) } diff --git a/pkg/controller/job/tracking_utils.go b/pkg/controller/job/tracking_utils.go new file mode 100644 index 00000000000..55996a2a2b8 --- /dev/null +++ b/pkg/controller/job/tracking_utils.go @@ -0,0 +1,117 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package job + +import ( + "fmt" + "sync" + + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/client-go/tools/cache" + "k8s.io/klog/v2" +) + +// uidSetKeyFunc to parse out the key from a uidSet. +var uidSetKeyFunc = func(obj interface{}) (string, error) { + if u, ok := obj.(*uidSet); ok { + return u.key, nil + } + return "", fmt.Errorf("could not find key for obj %#v", obj) +} + +// uidSet holds a key and a set of UIDs. Used by the +// uidTrackingExpectations to remember which UID it has seen/still waiting for. +type uidSet struct { + sync.RWMutex + set sets.String + key string +} + +// uidTrackingExpectations tracks the UIDs of Pods the controller is waiting to +// observe tracking finalizer deletions. +type uidTrackingExpectations struct { + store cache.Store +} + +// GetUIDs is a convenience method to avoid exposing the set of expected uids. +// The returned set is not thread safe, all modifications must be made holding +// the uidStoreLock. +func (u *uidTrackingExpectations) getSet(controllerKey string) *uidSet { + if obj, exists, err := u.store.GetByKey(controllerKey); err == nil && exists { + return obj.(*uidSet) + } + return nil +} + +func (u *uidTrackingExpectations) getExpectedUIDs(controllerKey string) sets.String { + uids := u.getSet(controllerKey) + if uids == nil { + return nil + } + uids.RLock() + set := sets.NewString(uids.set.UnsortedList()...) + uids.RUnlock() + return set +} + +// ExpectDeletions records expectations for the given deleteKeys, against the +// given job-key. +// This is thread-safe across different job keys. +func (u *uidTrackingExpectations) expectFinalizersRemoved(jobKey string, deletedKeys []string) error { + klog.V(4).InfoS("Expecting tracking finalizers removed", "job", jobKey, "podUIDs", deletedKeys) + + uids := u.getSet(jobKey) + if uids == nil { + uids = &uidSet{ + key: jobKey, + set: sets.NewString(), + } + if err := u.store.Add(uids); err != nil { + return err + } + } + uids.Lock() + uids.set.Insert(deletedKeys...) + uids.Unlock() + return nil +} + +// FinalizerRemovalObserved records the given deleteKey as a deletion, for the given job. +func (u *uidTrackingExpectations) finalizerRemovalObserved(jobKey, deleteKey string) { + uids := u.getSet(jobKey) + if uids != nil { + uids.Lock() + if uids.set.Has(deleteKey) { + klog.V(4).InfoS("Observed tracking finalizer removed", "job", jobKey, "podUID", deleteKey) + uids.set.Delete(deleteKey) + } + uids.Unlock() + } +} + +// DeleteExpectations deletes the UID set. +func (u *uidTrackingExpectations) deleteExpectations(jobKey string) { + if err := u.store.Delete(jobKey); err != nil { + klog.ErrorS(err, "deleting tracking annotation UID expectations", "job", jobKey) + } +} + +// NewUIDTrackingControllerExpectations returns a wrapper around +// ControllerExpectations that is aware of deleteKeys. +func newUIDTrackingExpectations() *uidTrackingExpectations { + return &uidTrackingExpectations{store: cache.NewStore(uidSetKeyFunc)} +} diff --git a/pkg/controller/job/tracking_utils_test.go b/pkg/controller/job/tracking_utils_test.go new file mode 100644 index 00000000000..1fc595271d0 --- /dev/null +++ b/pkg/controller/job/tracking_utils_test.go @@ -0,0 +1,111 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package job + +import ( + "sync" + "testing" + + "github.com/google/go-cmp/cmp" +) + +func TestUIDTrackingExpectations(t *testing.T) { + tracks := []struct { + job string + firstRound []string + secondRound []string + }{ + { + job: "foo", + firstRound: []string{"a", "b", "c", "d"}, + secondRound: []string{"e", "f"}, + }, + { + job: "bar", + firstRound: []string{"x", "y", "z"}, + secondRound: []string{"u", "v", "w"}, + }, + { + job: "baz", + firstRound: []string{"w"}, + secondRound: []string{"a"}, + }, + } + expectations := newUIDTrackingExpectations() + + // Insert first round of keys in parallel. + + var wg sync.WaitGroup + wg.Add(len(tracks)) + errs := make([]error, len(tracks)) + for i := range tracks { + track := tracks[i] + go func(errID int) { + errs[errID] = expectations.expectFinalizersRemoved(track.job, track.firstRound) + wg.Done() + }(i) + } + wg.Wait() + for i, err := range errs { + if err != nil { + t.Errorf("Failed adding first round of UIDs for job %s: %v", tracks[i].job, err) + } + } + + for _, track := range tracks { + uids := expectations.getSet(track.job) + if uids == nil { + t.Errorf("Set of UIDs is empty for job %s", track.job) + } else if diff := cmp.Diff(track.firstRound, uids.set.List()); diff != "" { + t.Errorf("Unexpected keys for job %s (-want,+got):\n%s", track.job, diff) + } + } + + // Delete the first round of keys and add the second round in parallel. + + for i, track := range tracks { + wg.Add(len(track.firstRound) + 1) + track := track + for _, uid := range track.firstRound { + uid := uid + go func() { + expectations.finalizerRemovalObserved(track.job, uid) + wg.Done() + }() + } + go func(errID int) { + errs[errID] = expectations.expectFinalizersRemoved(track.job, track.secondRound) + wg.Done() + }(i) + } + wg.Wait() + + for i, err := range errs { + if err != nil { + t.Errorf("Failed adding second round of UIDs for job %s: %v", tracks[i].job, err) + } + } + + for _, track := range tracks { + uids := expectations.getSet(track.job) + if uids == nil { + t.Errorf("Set of UIDs is empty for job %s", track.job) + } else if diff := cmp.Diff(track.secondRound, uids.set.List()); diff != "" { + t.Errorf("Unexpected keys for job %s (-want,+got):\n%s", track.job, diff) + } + } +} diff --git a/test/integration/job/job_test.go b/test/integration/job/job_test.go index 4cddc6c7669..0a599fb4043 100644 --- a/test/integration/job/job_test.go +++ b/test/integration/job/job_test.go @@ -21,6 +21,7 @@ import ( "errors" "fmt" "strconv" + "sync" "testing" "time" @@ -220,6 +221,10 @@ func TestParallelJobParallelism(t *testing.T) { } func TestParallelJobWithCompletions(t *testing.T) { + // Lower limits for a job sync so that we can test partial updates with a low + // number of pods. + t.Cleanup(setDuringTest(&jobcontroller.MaxUncountedPods, 10)) + t.Cleanup(setDuringTest(&jobcontroller.MaxPodCreateDeletePerSync, 10)) for _, wFinalizers := range []bool{false, true} { t.Run(fmt.Sprintf("finalizers=%t", wFinalizers), func(t *testing.T) { defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, wFinalizers)() @@ -230,8 +235,8 @@ func TestParallelJobWithCompletions(t *testing.T) { jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{ Spec: batchv1.JobSpec{ - Parallelism: pointer.Int32Ptr(4), - Completions: pointer.Int32Ptr(6), + Parallelism: pointer.Int32Ptr(54), + Completions: pointer.Int32Ptr(56), }, }) if err != nil { @@ -241,23 +246,23 @@ func TestParallelJobWithCompletions(t *testing.T) { t.Errorf("apiserver created job with tracking annotation: %t, want %t", got, wFinalizers) } validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ - Active: 4, + Active: 54, }, wFinalizers) // Failed Pods are replaced. if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 2); err != nil { t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodFailed, err) } validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ - Active: 4, + Active: 54, Failed: 2, }, wFinalizers) // Pods are created until the number of succeeded Pods equals completions. - if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 3); err != nil { + if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 53); err != nil { t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodSucceeded, err) } validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ Failed: 2, - Succeeded: 3, + Succeeded: 53, Active: 3, }, wFinalizers) // No more Pods are created after the Job completes. @@ -267,7 +272,7 @@ func TestParallelJobWithCompletions(t *testing.T) { validateJobSucceeded(ctx, t, clientSet, jobObj) validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ Failed: 2, - Succeeded: 6, + Succeeded: 56, }, false) validateFinishedPodsNoFinalizer(ctx, t, clientSet, jobObj) }) @@ -781,22 +786,44 @@ func setJobPodsPhase(ctx context.Context, clientSet clientset.Interface, jobObj if err != nil { return fmt.Errorf("listing Job Pods: %w", err) } + updates := make([]v1.Pod, 0, cnt) for _, pod := range pods.Items { - if cnt == 0 { + if len(updates) == cnt { break } if p := pod.Status.Phase; isPodOwnedByJob(&pod, jobObj) && p != v1.PodFailed && p != v1.PodSucceeded { pod.Status.Phase = phase - _, err := clientSet.CoreV1().Pods(pod.Namespace).UpdateStatus(ctx, &pod, metav1.UpdateOptions{}) - if err != nil { - return fmt.Errorf("updating Pod status: %w", err) - } - cnt-- + updates = append(updates, pod) } } - if cnt != 0 { + if len(updates) != cnt { return fmt.Errorf("couldn't set phase on %d Job Pods", cnt) } + return updatePodStatuses(ctx, clientSet, updates) +} + +func updatePodStatuses(ctx context.Context, clientSet clientset.Interface, updates []v1.Pod) error { + wg := sync.WaitGroup{} + wg.Add(len(updates)) + errCh := make(chan error, len(updates)) + + for _, pod := range updates { + pod := pod + go func() { + _, err := clientSet.CoreV1().Pods(pod.Namespace).UpdateStatus(ctx, &pod, metav1.UpdateOptions{}) + if err != nil { + errCh <- err + } + wg.Done() + }() + } + wg.Wait() + + select { + case err := <-errCh: + return fmt.Errorf("updating Pod status: %w", err) + default: + } return nil } @@ -860,7 +887,11 @@ func setup(t *testing.T, nsBaseName string) (framework.CloseFunc, *restclient.Co controlPlaneConfig := framework.NewIntegrationTestControlPlaneConfig() _, server, apiServerCloseFn := framework.RunAnAPIServer(controlPlaneConfig) - config := restclient.Config{Host: server.URL} + config := restclient.Config{ + Host: server.URL, + QPS: 200.0, + Burst: 200, + } clientSet, err := clientset.NewForConfig(&config) if err != nil { t.Fatalf("Error creating clientset: %v", err) @@ -899,3 +930,11 @@ func hasJobTrackingAnnotation(job *batchv1.Job) bool { _, ok := job.Annotations[batchv1.JobTrackingFinalizer] return ok } + +func setDuringTest(val *int, newVal int) func() { + origVal := *val + *val = newVal + return func() { + *val = origVal + } +}