mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-10 20:42:26 +00:00
Merge pull request #118883 from danielvegamyhre/kep-4017-job
Add completion index as pod label for indexed jobs
This commit is contained in:
commit
6f3856f953
@ -26,8 +26,10 @@ import (
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/apiserver/pkg/storage/names"
|
||||
"k8s.io/apiserver/pkg/util/feature"
|
||||
"k8s.io/klog/v2"
|
||||
"k8s.io/kubernetes/pkg/controller"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
)
|
||||
|
||||
const (
|
||||
@ -294,11 +296,17 @@ func addCompletionIndexEnvVariable(container *v1.Container) {
|
||||
return
|
||||
}
|
||||
}
|
||||
var fieldPath string
|
||||
if feature.DefaultFeatureGate.Enabled(features.PodIndexLabel) {
|
||||
fieldPath = fmt.Sprintf("metadata.labels['%s']", batch.JobCompletionIndexAnnotation)
|
||||
} else {
|
||||
fieldPath = fmt.Sprintf("metadata.annotations['%s']", batch.JobCompletionIndexAnnotation)
|
||||
}
|
||||
container.Env = append(container.Env, v1.EnvVar{
|
||||
Name: completionIndexEnvName,
|
||||
ValueFrom: &v1.EnvVarSource{
|
||||
FieldRef: &v1.ObjectFieldSelector{
|
||||
FieldPath: fmt.Sprintf("metadata.annotations['%s']", batch.JobCompletionIndexAnnotation),
|
||||
FieldPath: fieldPath,
|
||||
},
|
||||
},
|
||||
})
|
||||
@ -311,6 +319,14 @@ func addCompletionIndexAnnotation(template *v1.PodTemplateSpec, index int) {
|
||||
template.Annotations[batch.JobCompletionIndexAnnotation] = strconv.Itoa(index)
|
||||
}
|
||||
|
||||
func addCompletionIndexLabel(template *v1.PodTemplateSpec, index int) {
|
||||
if template.Labels == nil {
|
||||
template.Labels = make(map[string]string, 1)
|
||||
}
|
||||
// For consistency, we use the annotation batch.kubernetes.io/job-completion-index for the corresponding label as well.
|
||||
template.Labels[batch.JobCompletionIndexAnnotation] = strconv.Itoa(index)
|
||||
}
|
||||
|
||||
func podGenerateNameWithIndex(jobName string, index int) string {
|
||||
appendIndex := "-" + strconv.Itoa(index) + "-"
|
||||
generateNamePrefix := jobName + appendIndex
|
||||
|
@ -1495,6 +1495,10 @@ func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, jobCtx *syn
|
||||
if completionIndex != unknownCompletionIndex {
|
||||
template = podTemplate.DeepCopy()
|
||||
addCompletionIndexAnnotation(template, completionIndex)
|
||||
|
||||
if feature.DefaultFeatureGate.Enabled(features.PodIndexLabel) {
|
||||
addCompletionIndexLabel(template, completionIndex)
|
||||
}
|
||||
template.Spec.Hostname = fmt.Sprintf("%s-%d", job.Name, completionIndex)
|
||||
generateName = podGenerateNameWithIndex(job.Name, completionIndex)
|
||||
}
|
||||
|
@ -274,7 +274,8 @@ func TestControllerSyncJob(t *testing.T) {
|
||||
expectedPodPatches int
|
||||
|
||||
// features
|
||||
jobReadyPodsEnabled bool
|
||||
jobReadyPodsEnabled bool
|
||||
podIndexLabelDisabled bool
|
||||
}{
|
||||
"job start": {
|
||||
parallelism: 2,
|
||||
@ -781,12 +782,23 @@ func TestControllerSyncJob(t *testing.T) {
|
||||
expectedActive: 2,
|
||||
expectedPodPatches: 2,
|
||||
},
|
||||
"indexed job with podIndexLabel feature disabled": {
|
||||
parallelism: 2,
|
||||
completions: 5,
|
||||
backoffLimit: 6,
|
||||
completionMode: batch.IndexedCompletion,
|
||||
expectedCreations: 2,
|
||||
expectedActive: 2,
|
||||
expectedCreatedIndexes: sets.New(0, 1),
|
||||
podIndexLabelDisabled: true,
|
||||
},
|
||||
}
|
||||
|
||||
for name, tc := range testCases {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
logger, _ := ktesting.NewTestContext(t)
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobReadyPods, tc.jobReadyPodsEnabled)()
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.PodIndexLabel, !tc.podIndexLabelDisabled)()
|
||||
|
||||
// job manager setup
|
||||
clientSet := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
|
||||
@ -871,7 +883,7 @@ func TestControllerSyncJob(t *testing.T) {
|
||||
t.Errorf("Unexpected number of creates. Expected %d, saw %d\n", tc.expectedCreations, len(fakePodControl.Templates))
|
||||
}
|
||||
if tc.completionMode == batch.IndexedCompletion {
|
||||
checkIndexedJobPods(t, &fakePodControl, tc.expectedCreatedIndexes, job.Name)
|
||||
checkIndexedJobPods(t, &fakePodControl, tc.expectedCreatedIndexes, job.Name, tc.podIndexLabelDisabled)
|
||||
} else {
|
||||
for _, p := range fakePodControl.Templates {
|
||||
// Fake pod control doesn't add generate name from the owner reference.
|
||||
@ -958,11 +970,14 @@ func TestControllerSyncJob(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func checkIndexedJobPods(t *testing.T, control *controller.FakePodControl, wantIndexes sets.Set[int], jobName string) {
|
||||
func checkIndexedJobPods(t *testing.T, control *controller.FakePodControl, wantIndexes sets.Set[int], jobName string, podIndexLabelDisabled bool) {
|
||||
t.Helper()
|
||||
gotIndexes := sets.New[int]()
|
||||
for _, p := range control.Templates {
|
||||
checkJobCompletionEnvVariable(t, &p.Spec)
|
||||
checkJobCompletionEnvVariable(t, &p.Spec, podIndexLabelDisabled)
|
||||
if !podIndexLabelDisabled {
|
||||
checkJobCompletionLabel(t, &p)
|
||||
}
|
||||
ix := getCompletionIndex(p.Annotations)
|
||||
if ix == -1 {
|
||||
t.Errorf("Created pod %s didn't have completion index", p.Name)
|
||||
@ -4406,14 +4421,28 @@ func TestFinalizersRemovedExpectations(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func checkJobCompletionEnvVariable(t *testing.T, spec *v1.PodSpec) {
|
||||
func checkJobCompletionLabel(t *testing.T, p *v1.PodTemplateSpec) {
|
||||
t.Helper()
|
||||
labels := p.GetLabels()
|
||||
if labels == nil || labels[batch.JobCompletionIndexAnnotation] == "" {
|
||||
t.Errorf("missing expected pod label %s", batch.JobCompletionIndexAnnotation)
|
||||
}
|
||||
}
|
||||
|
||||
func checkJobCompletionEnvVariable(t *testing.T, spec *v1.PodSpec, podIndexLabelDisabled bool) {
|
||||
t.Helper()
|
||||
var fieldPath string
|
||||
if podIndexLabelDisabled {
|
||||
fieldPath = fmt.Sprintf("metadata.annotations['%s']", batch.JobCompletionIndexAnnotation)
|
||||
} else {
|
||||
fieldPath = fmt.Sprintf("metadata.labels['%s']", batch.JobCompletionIndexAnnotation)
|
||||
}
|
||||
want := []v1.EnvVar{
|
||||
{
|
||||
Name: "JOB_COMPLETION_INDEX",
|
||||
ValueFrom: &v1.EnvVarSource{
|
||||
FieldRef: &v1.ObjectFieldSelector{
|
||||
FieldPath: fmt.Sprintf("metadata.annotations['%s']", batch.JobCompletionIndexAnnotation),
|
||||
FieldPath: fieldPath,
|
||||
},
|
||||
},
|
||||
},
|
||||
|
@ -606,6 +606,13 @@ const (
|
||||
// the pod is being deleted due to a disruption.
|
||||
PodDisruptionConditions featuregate.Feature = "PodDisruptionConditions"
|
||||
|
||||
// owner: @danielvegamyhre
|
||||
// kep: https://kep.k8s.io/4017
|
||||
// beta: v1.28
|
||||
//
|
||||
// Set pod completion index as a pod label for Indexed Jobs.
|
||||
PodIndexLabel featuregate.Feature = "PodIndexLabel"
|
||||
|
||||
// owner: @ddebroy
|
||||
// alpha: v1.25
|
||||
//
|
||||
@ -1086,6 +1093,8 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS
|
||||
|
||||
InPlacePodVerticalScaling: {Default: false, PreRelease: featuregate.Alpha},
|
||||
|
||||
PodIndexLabel: {Default: true, PreRelease: featuregate.Beta},
|
||||
|
||||
// inherited features from generic apiserver, relisted here to get a conflict if it is changed
|
||||
// unintentionally on either side:
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user