From 60fc90967bc65bfe6a66e5dd1a78d3e1484717c4 Mon Sep 17 00:00:00 2001 From: Aldo Culquicondor Date: Thu, 9 Sep 2021 11:00:05 -0400 Subject: [PATCH] Count ready pods in job controller When the feature gate JobReadyPods is enabled. Change-Id: I86f93914568de6a7029f9ae92ee7b749686fbf97 --- pkg/controller/job/job_controller.go | 60 ++++++++++-- pkg/controller/job/job_controller_test.go | 111 ++++++++++++++-------- 2 files changed, 121 insertions(+), 50 deletions(-) diff --git a/pkg/controller/job/job_controller.go b/pkg/controller/job/job_controller.go index fb6baf85274..a61b74b2e7f 100644 --- a/pkg/controller/job/job_controller.go +++ b/pkg/controller/job/job_controller.go @@ -49,12 +49,18 @@ import ( "k8s.io/client-go/util/workqueue" "k8s.io/component-base/metrics/prometheus/ratelimiter" "k8s.io/klog/v2" + podutil "k8s.io/kubernetes/pkg/api/v1/pod" "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller/job/metrics" "k8s.io/kubernetes/pkg/features" "k8s.io/utils/integer" + "k8s.io/utils/pointer" ) +// podUpdateBatchPeriod is the batch period to hold pod updates before syncing +// a Job. It is used if the feature gate JobReadyPods is enabled. +const podUpdateBatchPeriod = 500 * time.Millisecond + // controllerKind contains the schema.GroupVersionKind for this controller type. var controllerKind = batch.SchemeGroupVersion.WithKind("Job") @@ -110,6 +116,8 @@ type Controller struct { orphanQueue workqueue.RateLimitingInterface recorder record.EventRecorder + + podUpdateBatchPeriod time.Duration } // NewController creates a new Job controller that keeps the relevant pods @@ -135,6 +143,9 @@ func NewController(podInformer coreinformers.PodInformer, jobInformer batchinfor orphanQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(DefaultJobBackOff, MaxJobBackOff), "job_orphan_pod"), recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "job-controller"}), } + if feature.DefaultFeatureGate.Enabled(features.JobReadyPods) { + jm.podUpdateBatchPeriod = podUpdateBatchPeriod + } jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { @@ -249,7 +260,7 @@ func (jm *Controller) addPod(obj interface{}) { return } jm.expectations.CreationObserved(jobKey) - jm.enqueueController(job, true) + jm.enqueueControllerPodUpdate(job, true) return } @@ -258,7 +269,7 @@ func (jm *Controller) addPod(obj interface{}) { // DO NOT observe creation because no controller should be waiting for an // orphan. for _, job := range jm.getPodJobs(pod) { - jm.enqueueController(job, true) + jm.enqueueControllerPodUpdate(job, true) } } @@ -303,7 +314,7 @@ func (jm *Controller) updatePod(old, cur interface{}) { jm.finalizerExpectations.finalizerRemovalObserved(key, string(curPod.UID)) } } - jm.enqueueController(job, immediate) + jm.enqueueControllerPodUpdate(job, immediate) } } @@ -319,7 +330,7 @@ func (jm *Controller) updatePod(old, cur interface{}) { jm.finalizerExpectations.finalizerRemovalObserved(key, string(curPod.UID)) } } - jm.enqueueController(job, immediate) + jm.enqueueControllerPodUpdate(job, immediate) return } @@ -328,7 +339,7 @@ func (jm *Controller) updatePod(old, cur interface{}) { labelChanged := !reflect.DeepEqual(curPod.Labels, oldPod.Labels) if labelChanged || controllerRefChanged { for _, job := range jm.getPodJobs(curPod) { - jm.enqueueController(job, immediate) + jm.enqueueControllerPodUpdate(job, immediate) } } } @@ -379,7 +390,7 @@ func (jm *Controller) deletePod(obj interface{}, final bool) { jm.finalizerExpectations.finalizerRemovalObserved(jobKey, string(pod.UID)) } - jm.enqueueController(job, true) + jm.enqueueControllerPodUpdate(job, true) } func (jm *Controller) updateJob(old, cur interface{}) { @@ -415,13 +426,21 @@ func (jm *Controller) updateJob(old, cur interface{}) { // immediate tells the controller to update the status right away, and should // happen ONLY when there was a successful pod run. func (jm *Controller) enqueueController(obj interface{}, immediate bool) { + jm.enqueueControllerDelayed(obj, immediate, 0) +} + +func (jm *Controller) enqueueControllerPodUpdate(obj interface{}, immediate bool) { + jm.enqueueControllerDelayed(obj, immediate, jm.podUpdateBatchPeriod) +} + +func (jm *Controller) enqueueControllerDelayed(obj interface{}, immediate bool, delay time.Duration) { key, err := controller.KeyFunc(obj) if err != nil { utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err)) return } - backoff := time.Duration(0) + backoff := delay if !immediate { backoff = getBackoff(jm.queue, key) } @@ -670,6 +689,10 @@ func (jm *Controller) syncJob(key string) (forget bool, rErr error) { activePods := controller.FilterActivePods(pods) active := int32(len(activePods)) succeeded, failed := getStatus(&job, pods, uncounted, expectedRmFinalizers) + var ready *int32 + if feature.DefaultFeatureGate.Enabled(features.JobReadyPods) { + ready = pointer.Int32(countReadyPods(activePods)) + } // Job first start. Set StartTime and start the ActiveDeadlineSeconds timer // only if the job is not in the suspended state. if job.Status.StartTime == nil && !jobSuspended(&job) { @@ -787,8 +810,9 @@ func (jm *Controller) syncJob(key string) (forget bool, rErr error) { } if uncounted != nil { - needsStatusUpdate := suspendCondChanged || active != job.Status.Active + needsStatusUpdate := suspendCondChanged || active != job.Status.Active || !equalReady(ready, job.Status.Ready) job.Status.Active = active + job.Status.Ready = ready err = jm.trackJobStatusAndRemoveFinalizers(&job, pods, prevSucceededIndexes, *uncounted, expectedRmFinalizers, finishedCondition, needsStatusUpdate) if err != nil { return false, fmt.Errorf("tracking status: %w", err) @@ -809,10 +833,11 @@ func (jm *Controller) syncJob(key string) (forget bool, rErr error) { } // no need to update the job if the status hasn't changed since last time - if job.Status.Active != active || job.Status.Succeeded != succeeded || job.Status.Failed != failed || suspendCondChanged || finishedCondition != nil { + if job.Status.Active != active || job.Status.Succeeded != succeeded || job.Status.Failed != failed || !equalReady(job.Status.Ready, ready) || suspendCondChanged || finishedCondition != nil { job.Status.Active = active job.Status.Succeeded = succeeded job.Status.Failed = failed + job.Status.Ready = ready if isIndexedJob(&job) { job.Status.CompletedIndexes = succeededIndexes.String() } @@ -1609,3 +1634,20 @@ func recordJobPodFinished(job *batch.Job, oldCounters batch.JobStatus) { diff = job.Status.Failed - oldCounters.Failed metrics.JobPodsFinished.WithLabelValues(completionMode, metrics.Failed).Add(float64(diff)) } + +func countReadyPods(pods []*v1.Pod) int32 { + cnt := int32(0) + for _, p := range pods { + if podutil.IsPodReady(p) { + cnt++ + } + } + return cnt +} + +func equalReady(a, b *int32) bool { + if a != nil && b != nil { + return *a == *b + } + return a == b +} diff --git a/pkg/controller/job/job_controller_test.go b/pkg/controller/job/job_controller_test.go index 49be41684e8..52274b1f293 100644 --- a/pkg/controller/job/job_controller_test.go +++ b/pkg/controller/job/job_controller_test.go @@ -125,31 +125,41 @@ func newPod(name string, job *batch.Job) *v1.Pod { } // create count pods with the given phase for the given job -func newPodList(count int32, status v1.PodPhase, job *batch.Job) []v1.Pod { - pods := []v1.Pod{} - for i := int32(0); i < count; i++ { +func newPodList(count int, status v1.PodPhase, job *batch.Job) []*v1.Pod { + var pods []*v1.Pod + for i := 0; i < count; i++ { newPod := newPod(fmt.Sprintf("pod-%v", rand.String(10)), job) newPod.Status = v1.PodStatus{Phase: status} if trackingUncountedPods(job) { newPod.Finalizers = append(newPod.Finalizers, batch.JobTrackingFinalizer) } - pods = append(pods, *newPod) + pods = append(pods, newPod) } return pods } -func setPodsStatuses(podIndexer cache.Indexer, job *batch.Job, pendingPods, activePods, succeededPods, failedPods int32) { +func setPodsStatuses(podIndexer cache.Indexer, job *batch.Job, pendingPods, activePods, succeededPods, failedPods, readyPods int) { for _, pod := range newPodList(pendingPods, v1.PodPending, job) { - podIndexer.Add(&pod) + podIndexer.Add(pod) } - for _, pod := range newPodList(activePods, v1.PodRunning, job) { - podIndexer.Add(&pod) + running := newPodList(activePods, v1.PodRunning, job) + for i, p := range running { + if i >= readyPods { + break + } + p.Status.Conditions = append(p.Status.Conditions, v1.PodCondition{ + Type: v1.PodReady, + Status: v1.ConditionTrue, + }) + } + for _, pod := range running { + podIndexer.Add(pod) } for _, pod := range newPodList(succeededPods, v1.PodSucceeded, job) { - podIndexer.Add(&pod) + podIndexer.Add(pod) } for _, pod := range newPodList(failedPods, v1.PodFailed, job) { - podIndexer.Add(&pod) + podIndexer.Add(pod) } } @@ -189,10 +199,11 @@ func TestControllerSyncJob(t *testing.T) { // pod setup podControllerError error jobKeyForget bool - pendingPods int32 - activePods int32 - succeededPods int32 - failedPods int32 + pendingPods int + activePods int + readyPods int + succeededPods int + failedPods int podsWithIndexes []indexPhase fakeExpectationAtCreation int32 // negative: ExpectDeletions, positive: ExpectCreations @@ -200,6 +211,7 @@ func TestControllerSyncJob(t *testing.T) { expectedCreations int32 expectedDeletions int32 expectedActive int32 + expectedReady *int32 expectedSucceeded int32 expectedCompletedIdxs string expectedFailed int32 @@ -212,8 +224,9 @@ func TestControllerSyncJob(t *testing.T) { expectedPodPatches int // features - indexedJobEnabled bool - suspendJobEnabled bool + indexedJobEnabled bool + suspendJobEnabled bool + jobReadyPodsEnabled bool }{ "job start": { parallelism: 2, @@ -240,12 +253,24 @@ func TestControllerSyncJob(t *testing.T) { expectedActive: 2, }, "correct # of pods": { - parallelism: 2, + parallelism: 3, completions: 5, backoffLimit: 6, jobKeyForget: true, - activePods: 2, - expectedActive: 2, + activePods: 3, + readyPods: 2, + expectedActive: 3, + }, + "correct # of pods, ready enabled": { + parallelism: 3, + completions: 5, + backoffLimit: 6, + jobKeyForget: true, + activePods: 3, + readyPods: 2, + expectedActive: 3, + expectedReady: pointer.Int32(2), + jobReadyPodsEnabled: true, }, "WQ job: correct # of pods": { parallelism: 2, @@ -709,6 +734,7 @@ func TestControllerSyncJob(t *testing.T) { } defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.IndexedJob, tc.indexedJobEnabled)() defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.SuspendJob, tc.suspendJobEnabled)() + defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobReadyPods, tc.jobReadyPodsEnabled)() defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, wFinalizers)() // job manager setup @@ -745,7 +771,7 @@ func TestControllerSyncJob(t *testing.T) { } sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job) podIndexer := sharedInformerFactory.Core().V1().Pods().Informer().GetIndexer() - setPodsStatuses(podIndexer, job, tc.pendingPods, tc.activePods, tc.succeededPods, tc.failedPods) + setPodsStatuses(podIndexer, job, tc.pendingPods, tc.activePods, tc.succeededPods, tc.failedPods, tc.readyPods) setPodsStatusesWithIndexes(podIndexer, job, tc.podsWithIndexes) actual := job @@ -822,6 +848,9 @@ func TestControllerSyncJob(t *testing.T) { if actual.Status.Active != tc.expectedActive { t.Errorf("Unexpected number of active pods. Expected %d, saw %d\n", tc.expectedActive, actual.Status.Active) } + if diff := cmp.Diff(tc.expectedReady, actual.Status.Ready); diff != "" { + t.Errorf("Unexpected number of ready pods (-want,+got): %s", diff) + } if actual.Status.Succeeded != tc.expectedSucceeded { t.Errorf("Unexpected number of succeeded pods. Expected %d, saw %d\n", tc.expectedSucceeded, actual.Status.Succeeded) } @@ -1655,9 +1684,9 @@ func TestSyncJobPastDeadline(t *testing.T) { suspend bool // pod setup - activePods int32 - succeededPods int32 - failedPods int32 + activePods int + succeededPods int + failedPods int // expectations expectedForGetKey bool @@ -1759,7 +1788,7 @@ func TestSyncJobPastDeadline(t *testing.T) { job.Status.StartTime = &start sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job) podIndexer := sharedInformerFactory.Core().V1().Pods().Informer().GetIndexer() - setPodsStatuses(podIndexer, job, 0, tc.activePods, tc.succeededPods, tc.failedPods) + setPodsStatuses(podIndexer, job, 0, tc.activePods, tc.succeededPods, tc.failedPods, 0) // run forget, err := manager.syncJob(testutil.GetKey(job, t)) @@ -2447,14 +2476,14 @@ func TestSyncJobExpectations(t *testing.T) { sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job) pods := newPodList(2, v1.PodPending, job) podIndexer := sharedInformerFactory.Core().V1().Pods().Informer().GetIndexer() - podIndexer.Add(&pods[0]) + podIndexer.Add(pods[0]) manager.expectations = FakeJobExpectations{ controller.NewControllerExpectations(), true, func() { // If we check active pods before checking expectations, the job // will create a new replica because it doesn't see this pod, but // has fulfilled its expectations. - podIndexer.Add(&pods[1]) + podIndexer.Add(pods[1]) }, } manager.syncJob(testutil.GetKey(job, t)) @@ -2551,7 +2580,7 @@ func TestWatchPods(t *testing.T) { pods := newPodList(1, v1.PodRunning, testJob) testPod := pods[0] testPod.Status.Phase = v1.PodFailed - fakeWatch.Add(&testPod) + fakeWatch.Add(testPod) t.Log("Waiting for pod to reach syncHandler") <-received @@ -2604,10 +2633,10 @@ func bumpResourceVersion(obj metav1.Object) { } type pods struct { - pending int32 - active int32 - succeed int32 - failed int32 + pending int + active int + succeed int + failed int } func TestJobBackoffReset(t *testing.T) { @@ -2656,7 +2685,7 @@ func TestJobBackoffReset(t *testing.T) { sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job) podIndexer := sharedInformerFactory.Core().V1().Pods().Informer().GetIndexer() - setPodsStatuses(podIndexer, job, tc.pods[0].pending, tc.pods[0].active, tc.pods[0].succeed, tc.pods[0].failed) + setPodsStatuses(podIndexer, job, tc.pods[0].pending, tc.pods[0].active, tc.pods[0].succeed, tc.pods[0].failed, 0) manager.queue.Add(key) manager.processNextWorkItem() retries := manager.queue.NumRequeues(key) @@ -2666,7 +2695,7 @@ func TestJobBackoffReset(t *testing.T) { job = actual sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Replace([]interface{}{actual}, actual.ResourceVersion) - setPodsStatuses(podIndexer, job, tc.pods[1].pending, tc.pods[1].active, tc.pods[1].succeed, tc.pods[1].failed) + setPodsStatuses(podIndexer, job, tc.pods[1].pending, tc.pods[1].active, tc.pods[1].succeed, tc.pods[1].failed, 0) manager.processNextWorkItem() retries = manager.queue.NumRequeues(key) if retries != 0 { @@ -2835,9 +2864,9 @@ func TestJobBackoffForOnFailure(t *testing.T) { job.Spec.Template.Spec.RestartPolicy = v1.RestartPolicyOnFailure sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job) podIndexer := sharedInformerFactory.Core().V1().Pods().Informer().GetIndexer() - for i, pod := range newPodList(int32(len(tc.restartCounts)), tc.podPhase, job) { + for i, pod := range newPodList(len(tc.restartCounts), tc.podPhase, job) { pod.Status.ContainerStatuses = []v1.ContainerStatus{{RestartCount: tc.restartCounts[i]}} - podIndexer.Add(&pod) + podIndexer.Add(pod) } // run @@ -2878,8 +2907,8 @@ func TestJobBackoffOnRestartPolicyNever(t *testing.T) { // pod setup activePodsPhase v1.PodPhase - activePods int32 - failedPods int32 + activePods int + failedPods int // expectations isExpectingAnError bool @@ -2938,10 +2967,10 @@ func TestJobBackoffOnRestartPolicyNever(t *testing.T) { sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job) podIndexer := sharedInformerFactory.Core().V1().Pods().Informer().GetIndexer() for _, pod := range newPodList(tc.failedPods, v1.PodFailed, job) { - podIndexer.Add(&pod) + podIndexer.Add(pod) } for _, pod := range newPodList(tc.activePods, tc.activePodsPhase, job) { - podIndexer.Add(&pod) + podIndexer.Add(pod) } // run @@ -3079,8 +3108,8 @@ func TestFinalizersRemovedExpectations(t *testing.T) { podIndexer := podInformer.GetIndexer() uids := sets.NewString() for i := range pods { - clientset.Tracker().Add(&pods[i]) - podIndexer.Add(&pods[i]) + clientset.Tracker().Add(pods[i]) + podIndexer.Add(pods[i]) uids.Insert(string(pods[i].UID)) } jobKey := testutil.GetKey(job, t)