diff --git a/pkg/controller/job/indexed_job_utils.go b/pkg/controller/job/indexed_job_utils.go index b04f803e0b7..49f42b18e61 100644 --- a/pkg/controller/job/indexed_job_utils.go +++ b/pkg/controller/job/indexed_job_utils.go @@ -26,8 +26,10 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apiserver/pkg/storage/names" + "k8s.io/apiserver/pkg/util/feature" "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/controller" + "k8s.io/kubernetes/pkg/features" ) const ( @@ -294,11 +296,17 @@ func addCompletionIndexEnvVariable(container *v1.Container) { return } } + var fieldPath string + if feature.DefaultFeatureGate.Enabled(features.PodIndexLabel) { + fieldPath = fmt.Sprintf("metadata.labels['%s']", batch.JobCompletionIndexAnnotation) + } else { + fieldPath = fmt.Sprintf("metadata.annotations['%s']", batch.JobCompletionIndexAnnotation) + } container.Env = append(container.Env, v1.EnvVar{ Name: completionIndexEnvName, ValueFrom: &v1.EnvVarSource{ FieldRef: &v1.ObjectFieldSelector{ - FieldPath: fmt.Sprintf("metadata.annotations['%s']", batch.JobCompletionIndexAnnotation), + FieldPath: fieldPath, }, }, }) @@ -311,6 +319,14 @@ func addCompletionIndexAnnotation(template *v1.PodTemplateSpec, index int) { template.Annotations[batch.JobCompletionIndexAnnotation] = strconv.Itoa(index) } +func addCompletionIndexLabel(template *v1.PodTemplateSpec, index int) { + if template.Labels == nil { + template.Labels = make(map[string]string, 1) + } + // For consistency, we use the annotation batch.kubernetes.io/job-completion-index for the corresponding label as well. + template.Labels[batch.JobCompletionIndexAnnotation] = strconv.Itoa(index) +} + func podGenerateNameWithIndex(jobName string, index int) string { appendIndex := "-" + strconv.Itoa(index) + "-" generateNamePrefix := jobName + appendIndex diff --git a/pkg/controller/job/job_controller.go b/pkg/controller/job/job_controller.go index fa8c2f9669e..77e1b642be8 100644 --- a/pkg/controller/job/job_controller.go +++ b/pkg/controller/job/job_controller.go @@ -1495,6 +1495,10 @@ func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, jobCtx *syn if completionIndex != unknownCompletionIndex { template = podTemplate.DeepCopy() addCompletionIndexAnnotation(template, completionIndex) + + if feature.DefaultFeatureGate.Enabled(features.PodIndexLabel) { + addCompletionIndexLabel(template, completionIndex) + } template.Spec.Hostname = fmt.Sprintf("%s-%d", job.Name, completionIndex) generateName = podGenerateNameWithIndex(job.Name, completionIndex) } diff --git a/pkg/controller/job/job_controller_test.go b/pkg/controller/job/job_controller_test.go index 4095a0c33e0..80e5fadadae 100644 --- a/pkg/controller/job/job_controller_test.go +++ b/pkg/controller/job/job_controller_test.go @@ -274,7 +274,8 @@ func TestControllerSyncJob(t *testing.T) { expectedPodPatches int // features - jobReadyPodsEnabled bool + jobReadyPodsEnabled bool + podIndexLabelDisabled bool }{ "job start": { parallelism: 2, @@ -781,12 +782,23 @@ func TestControllerSyncJob(t *testing.T) { expectedActive: 2, expectedPodPatches: 2, }, + "indexed job with podIndexLabel feature disabled": { + parallelism: 2, + completions: 5, + backoffLimit: 6, + completionMode: batch.IndexedCompletion, + expectedCreations: 2, + expectedActive: 2, + expectedCreatedIndexes: sets.New(0, 1), + podIndexLabelDisabled: true, + }, } for name, tc := range testCases { t.Run(name, func(t *testing.T) { logger, _ := ktesting.NewTestContext(t) defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobReadyPods, tc.jobReadyPodsEnabled)() + defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.PodIndexLabel, !tc.podIndexLabelDisabled)() // job manager setup clientSet := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) @@ -871,7 +883,7 @@ func TestControllerSyncJob(t *testing.T) { t.Errorf("Unexpected number of creates. Expected %d, saw %d\n", tc.expectedCreations, len(fakePodControl.Templates)) } if tc.completionMode == batch.IndexedCompletion { - checkIndexedJobPods(t, &fakePodControl, tc.expectedCreatedIndexes, job.Name) + checkIndexedJobPods(t, &fakePodControl, tc.expectedCreatedIndexes, job.Name, tc.podIndexLabelDisabled) } else { for _, p := range fakePodControl.Templates { // Fake pod control doesn't add generate name from the owner reference. @@ -958,11 +970,14 @@ func TestControllerSyncJob(t *testing.T) { } } -func checkIndexedJobPods(t *testing.T, control *controller.FakePodControl, wantIndexes sets.Set[int], jobName string) { +func checkIndexedJobPods(t *testing.T, control *controller.FakePodControl, wantIndexes sets.Set[int], jobName string, podIndexLabelDisabled bool) { t.Helper() gotIndexes := sets.New[int]() for _, p := range control.Templates { - checkJobCompletionEnvVariable(t, &p.Spec) + checkJobCompletionEnvVariable(t, &p.Spec, podIndexLabelDisabled) + if !podIndexLabelDisabled { + checkJobCompletionLabel(t, &p) + } ix := getCompletionIndex(p.Annotations) if ix == -1 { t.Errorf("Created pod %s didn't have completion index", p.Name) @@ -4406,14 +4421,28 @@ func TestFinalizersRemovedExpectations(t *testing.T) { } } -func checkJobCompletionEnvVariable(t *testing.T, spec *v1.PodSpec) { +func checkJobCompletionLabel(t *testing.T, p *v1.PodTemplateSpec) { t.Helper() + labels := p.GetLabels() + if labels == nil || labels[batch.JobCompletionIndexAnnotation] == "" { + t.Errorf("missing expected pod label %s", batch.JobCompletionIndexAnnotation) + } +} + +func checkJobCompletionEnvVariable(t *testing.T, spec *v1.PodSpec, podIndexLabelDisabled bool) { + t.Helper() + var fieldPath string + if podIndexLabelDisabled { + fieldPath = fmt.Sprintf("metadata.annotations['%s']", batch.JobCompletionIndexAnnotation) + } else { + fieldPath = fmt.Sprintf("metadata.labels['%s']", batch.JobCompletionIndexAnnotation) + } want := []v1.EnvVar{ { Name: "JOB_COMPLETION_INDEX", ValueFrom: &v1.EnvVarSource{ FieldRef: &v1.ObjectFieldSelector{ - FieldPath: fmt.Sprintf("metadata.annotations['%s']", batch.JobCompletionIndexAnnotation), + FieldPath: fieldPath, }, }, }, diff --git a/pkg/features/kube_features.go b/pkg/features/kube_features.go index 701f5922f44..314b08868c3 100644 --- a/pkg/features/kube_features.go +++ b/pkg/features/kube_features.go @@ -606,6 +606,13 @@ const ( // the pod is being deleted due to a disruption. PodDisruptionConditions featuregate.Feature = "PodDisruptionConditions" + // owner: @danielvegamyhre + // kep: https://kep.k8s.io/4017 + // beta: v1.28 + // + // Set pod completion index as a pod label for Indexed Jobs. + PodIndexLabel featuregate.Feature = "PodIndexLabel" + // owner: @ddebroy // alpha: v1.25 // @@ -1086,6 +1093,8 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS InPlacePodVerticalScaling: {Default: false, PreRelease: featuregate.Alpha}, + PodIndexLabel: {Default: true, PreRelease: featuregate.Beta}, + // inherited features from generic apiserver, relisted here to get a conflict if it is changed // unintentionally on either side: