Count ready pods in job controller

When the feature gate JobReadyPods is enabled.

Change-Id: I86f93914568de6a7029f9ae92ee7b749686fbf97
This commit is contained in:
Aldo Culquicondor 2021-09-09 11:00:05 -04:00
parent 1bff5eb44d
commit 60fc90967b
2 changed files with 121 additions and 50 deletions

View File

@ -49,12 +49,18 @@ import (
"k8s.io/client-go/util/workqueue"
"k8s.io/component-base/metrics/prometheus/ratelimiter"
"k8s.io/klog/v2"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/job/metrics"
"k8s.io/kubernetes/pkg/features"
"k8s.io/utils/integer"
"k8s.io/utils/pointer"
)
// podUpdateBatchPeriod is the batch period to hold pod updates before syncing
// a Job. It is used if the feature gate JobReadyPods is enabled.
const podUpdateBatchPeriod = 500 * time.Millisecond
// controllerKind contains the schema.GroupVersionKind for this controller type.
var controllerKind = batch.SchemeGroupVersion.WithKind("Job")
@ -110,6 +116,8 @@ type Controller struct {
orphanQueue workqueue.RateLimitingInterface
recorder record.EventRecorder
podUpdateBatchPeriod time.Duration
}
// NewController creates a new Job controller that keeps the relevant pods
@ -135,6 +143,9 @@ func NewController(podInformer coreinformers.PodInformer, jobInformer batchinfor
orphanQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(DefaultJobBackOff, MaxJobBackOff), "job_orphan_pod"),
recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "job-controller"}),
}
if feature.DefaultFeatureGate.Enabled(features.JobReadyPods) {
jm.podUpdateBatchPeriod = podUpdateBatchPeriod
}
jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
@ -249,7 +260,7 @@ func (jm *Controller) addPod(obj interface{}) {
return
}
jm.expectations.CreationObserved(jobKey)
jm.enqueueController(job, true)
jm.enqueueControllerPodUpdate(job, true)
return
}
@ -258,7 +269,7 @@ func (jm *Controller) addPod(obj interface{}) {
// DO NOT observe creation because no controller should be waiting for an
// orphan.
for _, job := range jm.getPodJobs(pod) {
jm.enqueueController(job, true)
jm.enqueueControllerPodUpdate(job, true)
}
}
@ -303,7 +314,7 @@ func (jm *Controller) updatePod(old, cur interface{}) {
jm.finalizerExpectations.finalizerRemovalObserved(key, string(curPod.UID))
}
}
jm.enqueueController(job, immediate)
jm.enqueueControllerPodUpdate(job, immediate)
}
}
@ -319,7 +330,7 @@ func (jm *Controller) updatePod(old, cur interface{}) {
jm.finalizerExpectations.finalizerRemovalObserved(key, string(curPod.UID))
}
}
jm.enqueueController(job, immediate)
jm.enqueueControllerPodUpdate(job, immediate)
return
}
@ -328,7 +339,7 @@ func (jm *Controller) updatePod(old, cur interface{}) {
labelChanged := !reflect.DeepEqual(curPod.Labels, oldPod.Labels)
if labelChanged || controllerRefChanged {
for _, job := range jm.getPodJobs(curPod) {
jm.enqueueController(job, immediate)
jm.enqueueControllerPodUpdate(job, immediate)
}
}
}
@ -379,7 +390,7 @@ func (jm *Controller) deletePod(obj interface{}, final bool) {
jm.finalizerExpectations.finalizerRemovalObserved(jobKey, string(pod.UID))
}
jm.enqueueController(job, true)
jm.enqueueControllerPodUpdate(job, true)
}
func (jm *Controller) updateJob(old, cur interface{}) {
@ -415,13 +426,21 @@ func (jm *Controller) updateJob(old, cur interface{}) {
// immediate tells the controller to update the status right away, and should
// happen ONLY when there was a successful pod run.
func (jm *Controller) enqueueController(obj interface{}, immediate bool) {
jm.enqueueControllerDelayed(obj, immediate, 0)
}
func (jm *Controller) enqueueControllerPodUpdate(obj interface{}, immediate bool) {
jm.enqueueControllerDelayed(obj, immediate, jm.podUpdateBatchPeriod)
}
func (jm *Controller) enqueueControllerDelayed(obj interface{}, immediate bool, delay time.Duration) {
key, err := controller.KeyFunc(obj)
if err != nil {
utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err))
return
}
backoff := time.Duration(0)
backoff := delay
if !immediate {
backoff = getBackoff(jm.queue, key)
}
@ -670,6 +689,10 @@ func (jm *Controller) syncJob(key string) (forget bool, rErr error) {
activePods := controller.FilterActivePods(pods)
active := int32(len(activePods))
succeeded, failed := getStatus(&job, pods, uncounted, expectedRmFinalizers)
var ready *int32
if feature.DefaultFeatureGate.Enabled(features.JobReadyPods) {
ready = pointer.Int32(countReadyPods(activePods))
}
// Job first start. Set StartTime and start the ActiveDeadlineSeconds timer
// only if the job is not in the suspended state.
if job.Status.StartTime == nil && !jobSuspended(&job) {
@ -787,8 +810,9 @@ func (jm *Controller) syncJob(key string) (forget bool, rErr error) {
}
if uncounted != nil {
needsStatusUpdate := suspendCondChanged || active != job.Status.Active
needsStatusUpdate := suspendCondChanged || active != job.Status.Active || !equalReady(ready, job.Status.Ready)
job.Status.Active = active
job.Status.Ready = ready
err = jm.trackJobStatusAndRemoveFinalizers(&job, pods, prevSucceededIndexes, *uncounted, expectedRmFinalizers, finishedCondition, needsStatusUpdate)
if err != nil {
return false, fmt.Errorf("tracking status: %w", err)
@ -809,10 +833,11 @@ func (jm *Controller) syncJob(key string) (forget bool, rErr error) {
}
// no need to update the job if the status hasn't changed since last time
if job.Status.Active != active || job.Status.Succeeded != succeeded || job.Status.Failed != failed || suspendCondChanged || finishedCondition != nil {
if job.Status.Active != active || job.Status.Succeeded != succeeded || job.Status.Failed != failed || !equalReady(job.Status.Ready, ready) || suspendCondChanged || finishedCondition != nil {
job.Status.Active = active
job.Status.Succeeded = succeeded
job.Status.Failed = failed
job.Status.Ready = ready
if isIndexedJob(&job) {
job.Status.CompletedIndexes = succeededIndexes.String()
}
@ -1609,3 +1634,20 @@ func recordJobPodFinished(job *batch.Job, oldCounters batch.JobStatus) {
diff = job.Status.Failed - oldCounters.Failed
metrics.JobPodsFinished.WithLabelValues(completionMode, metrics.Failed).Add(float64(diff))
}
func countReadyPods(pods []*v1.Pod) int32 {
cnt := int32(0)
for _, p := range pods {
if podutil.IsPodReady(p) {
cnt++
}
}
return cnt
}
func equalReady(a, b *int32) bool {
if a != nil && b != nil {
return *a == *b
}
return a == b
}

View File

@ -125,31 +125,41 @@ func newPod(name string, job *batch.Job) *v1.Pod {
}
// create count pods with the given phase for the given job
func newPodList(count int32, status v1.PodPhase, job *batch.Job) []v1.Pod {
pods := []v1.Pod{}
for i := int32(0); i < count; i++ {
func newPodList(count int, status v1.PodPhase, job *batch.Job) []*v1.Pod {
var pods []*v1.Pod
for i := 0; i < count; i++ {
newPod := newPod(fmt.Sprintf("pod-%v", rand.String(10)), job)
newPod.Status = v1.PodStatus{Phase: status}
if trackingUncountedPods(job) {
newPod.Finalizers = append(newPod.Finalizers, batch.JobTrackingFinalizer)
}
pods = append(pods, *newPod)
pods = append(pods, newPod)
}
return pods
}
func setPodsStatuses(podIndexer cache.Indexer, job *batch.Job, pendingPods, activePods, succeededPods, failedPods int32) {
func setPodsStatuses(podIndexer cache.Indexer, job *batch.Job, pendingPods, activePods, succeededPods, failedPods, readyPods int) {
for _, pod := range newPodList(pendingPods, v1.PodPending, job) {
podIndexer.Add(&pod)
podIndexer.Add(pod)
}
for _, pod := range newPodList(activePods, v1.PodRunning, job) {
podIndexer.Add(&pod)
running := newPodList(activePods, v1.PodRunning, job)
for i, p := range running {
if i >= readyPods {
break
}
p.Status.Conditions = append(p.Status.Conditions, v1.PodCondition{
Type: v1.PodReady,
Status: v1.ConditionTrue,
})
}
for _, pod := range running {
podIndexer.Add(pod)
}
for _, pod := range newPodList(succeededPods, v1.PodSucceeded, job) {
podIndexer.Add(&pod)
podIndexer.Add(pod)
}
for _, pod := range newPodList(failedPods, v1.PodFailed, job) {
podIndexer.Add(&pod)
podIndexer.Add(pod)
}
}
@ -189,10 +199,11 @@ func TestControllerSyncJob(t *testing.T) {
// pod setup
podControllerError error
jobKeyForget bool
pendingPods int32
activePods int32
succeededPods int32
failedPods int32
pendingPods int
activePods int
readyPods int
succeededPods int
failedPods int
podsWithIndexes []indexPhase
fakeExpectationAtCreation int32 // negative: ExpectDeletions, positive: ExpectCreations
@ -200,6 +211,7 @@ func TestControllerSyncJob(t *testing.T) {
expectedCreations int32
expectedDeletions int32
expectedActive int32
expectedReady *int32
expectedSucceeded int32
expectedCompletedIdxs string
expectedFailed int32
@ -212,8 +224,9 @@ func TestControllerSyncJob(t *testing.T) {
expectedPodPatches int
// features
indexedJobEnabled bool
suspendJobEnabled bool
indexedJobEnabled bool
suspendJobEnabled bool
jobReadyPodsEnabled bool
}{
"job start": {
parallelism: 2,
@ -240,12 +253,24 @@ func TestControllerSyncJob(t *testing.T) {
expectedActive: 2,
},
"correct # of pods": {
parallelism: 2,
parallelism: 3,
completions: 5,
backoffLimit: 6,
jobKeyForget: true,
activePods: 2,
expectedActive: 2,
activePods: 3,
readyPods: 2,
expectedActive: 3,
},
"correct # of pods, ready enabled": {
parallelism: 3,
completions: 5,
backoffLimit: 6,
jobKeyForget: true,
activePods: 3,
readyPods: 2,
expectedActive: 3,
expectedReady: pointer.Int32(2),
jobReadyPodsEnabled: true,
},
"WQ job: correct # of pods": {
parallelism: 2,
@ -709,6 +734,7 @@ func TestControllerSyncJob(t *testing.T) {
}
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.IndexedJob, tc.indexedJobEnabled)()
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.SuspendJob, tc.suspendJobEnabled)()
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobReadyPods, tc.jobReadyPodsEnabled)()
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, wFinalizers)()
// job manager setup
@ -745,7 +771,7 @@ func TestControllerSyncJob(t *testing.T) {
}
sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job)
podIndexer := sharedInformerFactory.Core().V1().Pods().Informer().GetIndexer()
setPodsStatuses(podIndexer, job, tc.pendingPods, tc.activePods, tc.succeededPods, tc.failedPods)
setPodsStatuses(podIndexer, job, tc.pendingPods, tc.activePods, tc.succeededPods, tc.failedPods, tc.readyPods)
setPodsStatusesWithIndexes(podIndexer, job, tc.podsWithIndexes)
actual := job
@ -822,6 +848,9 @@ func TestControllerSyncJob(t *testing.T) {
if actual.Status.Active != tc.expectedActive {
t.Errorf("Unexpected number of active pods. Expected %d, saw %d\n", tc.expectedActive, actual.Status.Active)
}
if diff := cmp.Diff(tc.expectedReady, actual.Status.Ready); diff != "" {
t.Errorf("Unexpected number of ready pods (-want,+got): %s", diff)
}
if actual.Status.Succeeded != tc.expectedSucceeded {
t.Errorf("Unexpected number of succeeded pods. Expected %d, saw %d\n", tc.expectedSucceeded, actual.Status.Succeeded)
}
@ -1655,9 +1684,9 @@ func TestSyncJobPastDeadline(t *testing.T) {
suspend bool
// pod setup
activePods int32
succeededPods int32
failedPods int32
activePods int
succeededPods int
failedPods int
// expectations
expectedForGetKey bool
@ -1759,7 +1788,7 @@ func TestSyncJobPastDeadline(t *testing.T) {
job.Status.StartTime = &start
sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job)
podIndexer := sharedInformerFactory.Core().V1().Pods().Informer().GetIndexer()
setPodsStatuses(podIndexer, job, 0, tc.activePods, tc.succeededPods, tc.failedPods)
setPodsStatuses(podIndexer, job, 0, tc.activePods, tc.succeededPods, tc.failedPods, 0)
// run
forget, err := manager.syncJob(testutil.GetKey(job, t))
@ -2447,14 +2476,14 @@ func TestSyncJobExpectations(t *testing.T) {
sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job)
pods := newPodList(2, v1.PodPending, job)
podIndexer := sharedInformerFactory.Core().V1().Pods().Informer().GetIndexer()
podIndexer.Add(&pods[0])
podIndexer.Add(pods[0])
manager.expectations = FakeJobExpectations{
controller.NewControllerExpectations(), true, func() {
// If we check active pods before checking expectations, the job
// will create a new replica because it doesn't see this pod, but
// has fulfilled its expectations.
podIndexer.Add(&pods[1])
podIndexer.Add(pods[1])
},
}
manager.syncJob(testutil.GetKey(job, t))
@ -2551,7 +2580,7 @@ func TestWatchPods(t *testing.T) {
pods := newPodList(1, v1.PodRunning, testJob)
testPod := pods[0]
testPod.Status.Phase = v1.PodFailed
fakeWatch.Add(&testPod)
fakeWatch.Add(testPod)
t.Log("Waiting for pod to reach syncHandler")
<-received
@ -2604,10 +2633,10 @@ func bumpResourceVersion(obj metav1.Object) {
}
type pods struct {
pending int32
active int32
succeed int32
failed int32
pending int
active int
succeed int
failed int
}
func TestJobBackoffReset(t *testing.T) {
@ -2656,7 +2685,7 @@ func TestJobBackoffReset(t *testing.T) {
sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job)
podIndexer := sharedInformerFactory.Core().V1().Pods().Informer().GetIndexer()
setPodsStatuses(podIndexer, job, tc.pods[0].pending, tc.pods[0].active, tc.pods[0].succeed, tc.pods[0].failed)
setPodsStatuses(podIndexer, job, tc.pods[0].pending, tc.pods[0].active, tc.pods[0].succeed, tc.pods[0].failed, 0)
manager.queue.Add(key)
manager.processNextWorkItem()
retries := manager.queue.NumRequeues(key)
@ -2666,7 +2695,7 @@ func TestJobBackoffReset(t *testing.T) {
job = actual
sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Replace([]interface{}{actual}, actual.ResourceVersion)
setPodsStatuses(podIndexer, job, tc.pods[1].pending, tc.pods[1].active, tc.pods[1].succeed, tc.pods[1].failed)
setPodsStatuses(podIndexer, job, tc.pods[1].pending, tc.pods[1].active, tc.pods[1].succeed, tc.pods[1].failed, 0)
manager.processNextWorkItem()
retries = manager.queue.NumRequeues(key)
if retries != 0 {
@ -2835,9 +2864,9 @@ func TestJobBackoffForOnFailure(t *testing.T) {
job.Spec.Template.Spec.RestartPolicy = v1.RestartPolicyOnFailure
sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job)
podIndexer := sharedInformerFactory.Core().V1().Pods().Informer().GetIndexer()
for i, pod := range newPodList(int32(len(tc.restartCounts)), tc.podPhase, job) {
for i, pod := range newPodList(len(tc.restartCounts), tc.podPhase, job) {
pod.Status.ContainerStatuses = []v1.ContainerStatus{{RestartCount: tc.restartCounts[i]}}
podIndexer.Add(&pod)
podIndexer.Add(pod)
}
// run
@ -2878,8 +2907,8 @@ func TestJobBackoffOnRestartPolicyNever(t *testing.T) {
// pod setup
activePodsPhase v1.PodPhase
activePods int32
failedPods int32
activePods int
failedPods int
// expectations
isExpectingAnError bool
@ -2938,10 +2967,10 @@ func TestJobBackoffOnRestartPolicyNever(t *testing.T) {
sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job)
podIndexer := sharedInformerFactory.Core().V1().Pods().Informer().GetIndexer()
for _, pod := range newPodList(tc.failedPods, v1.PodFailed, job) {
podIndexer.Add(&pod)
podIndexer.Add(pod)
}
for _, pod := range newPodList(tc.activePods, tc.activePodsPhase, job) {
podIndexer.Add(&pod)
podIndexer.Add(pod)
}
// run
@ -3079,8 +3108,8 @@ func TestFinalizersRemovedExpectations(t *testing.T) {
podIndexer := podInformer.GetIndexer()
uids := sets.NewString()
for i := range pods {
clientset.Tracker().Add(&pods[i])
podIndexer.Add(&pods[i])
clientset.Tracker().Add(pods[i])
podIndexer.Add(pods[i])
uids.Insert(string(pods[i].UID))
}
jobKey := testutil.GetKey(job, t)