From e64e34e0298d27d4099b632f5b7c1ba38fc66561 Mon Sep 17 00:00:00 2001 From: Mengxue Zhang Date: Thu, 29 Apr 2021 03:33:36 +0000 Subject: [PATCH] specify pod name and hostname in indexed job --- pkg/controller/controller_utils.go | 42 +++++++++++++++---- pkg/controller/controller_utils_test.go | 42 +++++++++++++++++++ pkg/controller/job/indexed_job_utils.go | 10 +++++ pkg/controller/job/indexed_job_utils_test.go | 32 ++++++++++++++ pkg/controller/job/job_controller.go | 4 +- pkg/controller/job/job_controller_test.go | 9 +++- .../apiserver/pkg/storage/names/generate.go | 6 +-- test/e2e/apps/job.go | 6 ++- test/integration/job/job_test.go | 24 +++++++---- 9 files changed, 151 insertions(+), 24 deletions(-) diff --git a/pkg/controller/controller_utils.go b/pkg/controller/controller_utils.go index e7c1131ecda..86c2146335d 100644 --- a/pkg/controller/controller_utils.go +++ b/pkg/controller/controller_utils.go @@ -449,6 +449,8 @@ type PodControlInterface interface { CreatePods(namespace string, template *v1.PodTemplateSpec, object runtime.Object) error // CreatePodsWithControllerRef creates new pods according to the spec, and sets object as the pod's controller. CreatePodsWithControllerRef(namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error + // CreatePodsWithControllerRefAndGenerateName creates new pods according to the spec, sets object as the pod's controller and sets pod's generateName. + CreatePodsWithControllerRefAndGenerateName(namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference, generateName string) error // DeletePod deletes the pod identified by podID. DeletePod(namespace string, podID string, object runtime.Object) error // PatchPod patches the pod. @@ -514,14 +516,29 @@ func validateControllerRef(controllerRef *metav1.OwnerReference) error { } func (r RealPodControl) CreatePods(namespace string, template *v1.PodTemplateSpec, object runtime.Object) error { - return r.createPods(namespace, template, object, nil) + pod, err := GetPodFromTemplate(template, object, nil) + if err != nil { + return err + } + return r.createPods(namespace, pod, object) } func (r RealPodControl) CreatePodsWithControllerRef(namespace string, template *v1.PodTemplateSpec, controllerObject runtime.Object, controllerRef *metav1.OwnerReference) error { + return r.CreatePodsWithControllerRefAndGenerateName(namespace, template, controllerObject, controllerRef, "") +} + +func (r RealPodControl) CreatePodsWithControllerRefAndGenerateName(namespace string, template *v1.PodTemplateSpec, controllerObject runtime.Object, controllerRef *metav1.OwnerReference, generateName string) error { if err := validateControllerRef(controllerRef); err != nil { return err } - return r.createPods(namespace, template, controllerObject, controllerRef) + pod, err := GetPodFromTemplate(template, controllerObject, controllerRef) + if err != nil { + return err + } + if len(generateName) > 0 { + pod.ObjectMeta.GenerateName = generateName + } + return r.createPods(namespace, pod, controllerObject) } func (r RealPodControl) PatchPod(namespace, name string, data []byte) error { @@ -554,11 +571,7 @@ func GetPodFromTemplate(template *v1.PodTemplateSpec, parentObject runtime.Objec return pod, nil } -func (r RealPodControl) createPods(namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error { - pod, err := GetPodFromTemplate(template, object, controllerRef) - if err != nil { - return err - } +func (r RealPodControl) createPods(namespace string, pod *v1.Pod, object runtime.Object) error { if len(labels.Set(pod.Labels)) == 0 { return fmt.Errorf("unable to create pods, no labels") } @@ -652,6 +665,21 @@ func (f *FakePodControl) CreatePodsWithControllerRef(namespace string, spec *v1. return nil } +func (f *FakePodControl) CreatePodsWithControllerRefAndGenerateName(namespace string, spec *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference, generateNamePrefix string) error { + f.Lock() + defer f.Unlock() + f.CreateCallCount++ + if f.CreateLimit != 0 && f.CreateCallCount > f.CreateLimit { + return fmt.Errorf("not creating pod, limit %d already reached (create call %d)", f.CreateLimit, f.CreateCallCount) + } + f.Templates = append(f.Templates, *spec) + f.ControllerRefs = append(f.ControllerRefs, *controllerRef) + if f.Err != nil { + return f.Err + } + return nil +} + func (f *FakePodControl) DeletePod(namespace string, podID string, object runtime.Object) error { f.Lock() defer f.Unlock() diff --git a/pkg/controller/controller_utils_test.go b/pkg/controller/controller_utils_test.go index e5139c60d3f..b2aaefe3731 100644 --- a/pkg/controller/controller_utils_test.go +++ b/pkg/controller/controller_utils_test.go @@ -29,6 +29,7 @@ import ( "time" apps "k8s.io/api/apps/v1" + batchv1 "k8s.io/api/batch/v1" v1 "k8s.io/api/core/v1" apiequality "k8s.io/apimachinery/pkg/api/equality" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -320,6 +321,47 @@ func TestCreatePods(t *testing.T) { "Body: %s", fakeHandler.RequestBody) } +func TestCreatePodsWithControllerRefAndGenerateName(t *testing.T) { + ns := metav1.NamespaceDefault + body := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "empty_pod"}}) + fakeHandler := utiltesting.FakeHandler{ + StatusCode: 200, + ResponseBody: string(body), + } + testServer := httptest.NewServer(&fakeHandler) + defer testServer.Close() + clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) + + podControl := RealPodControl{ + KubeClient: clientset, + Recorder: &record.FakeRecorder{}, + } + + controllerSpec := newReplicationController(1) + controllerRef := metav1.NewControllerRef(controllerSpec, batchv1.SchemeGroupVersion.WithKind("Job")) + + // Make sure createReplica sends a POST to the apiserver with a pod from the controllers pod template + generateName := "hello-" + err := podControl.CreatePodsWithControllerRefAndGenerateName(ns, controllerSpec.Spec.Template, controllerSpec, controllerRef, generateName) + assert.NoError(t, err, "unexpected error: %v", err) + + expectedPod := v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Labels: controllerSpec.Spec.Template.Labels, + GenerateName: generateName, + OwnerReferences: []metav1.OwnerReference{*controllerRef}, + }, + Spec: controllerSpec.Spec.Template.Spec, + } + + fakeHandler.ValidateRequest(t, "/api/v1/namespaces/default/pods", "POST", nil) + var actualPod = &v1.Pod{} + err = json.Unmarshal([]byte(fakeHandler.RequestBody), actualPod) + assert.NoError(t, err, "unexpected error: %v", err) + assert.True(t, apiequality.Semantic.DeepDerivative(&expectedPod, actualPod), + "Body: %s", fakeHandler.RequestBody) +} + func TestDeletePodsAllowsMissing(t *testing.T) { fakeClient := fake.NewSimpleClientset() podControl := RealPodControl{ diff --git a/pkg/controller/job/indexed_job_utils.go b/pkg/controller/job/indexed_job_utils.go index 9d8d9337314..d456ffeb830 100644 --- a/pkg/controller/job/indexed_job_utils.go +++ b/pkg/controller/job/indexed_job_utils.go @@ -26,6 +26,7 @@ import ( batch "k8s.io/api/batch/v1" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apiserver/pkg/storage/names" "k8s.io/kubernetes/pkg/controller" ) @@ -216,6 +217,15 @@ func addCompletionIndexAnnotation(template *v1.PodTemplateSpec, index int) { template.Annotations[batch.JobCompletionIndexAnnotation] = strconv.Itoa(index) } +func podGenerateNameWithIndex(jobName string, index int) string { + appendIndex := "-" + strconv.Itoa(index) + "-" + generateNamePrefix := jobName + appendIndex + if len(generateNamePrefix) > names.MaxGeneratedNameLength { + generateNamePrefix = generateNamePrefix[:names.MaxGeneratedNameLength-len(appendIndex)] + appendIndex + } + return generateNamePrefix +} + type byCompletionIndex []*v1.Pod func (bci byCompletionIndex) Less(i, j int) bool { diff --git a/pkg/controller/job/indexed_job_utils_test.go b/pkg/controller/job/indexed_job_utils_test.go index 46dfa0c38a4..cad45172d08 100644 --- a/pkg/controller/job/indexed_job_utils_test.go +++ b/pkg/controller/job/indexed_job_utils_test.go @@ -269,6 +269,38 @@ func TestAppendDuplicatedIndexPodsForRemoval(t *testing.T) { } } +func TestPodGenerateNameWithIndex(t *testing.T) { + cases := map[string]struct { + jobname string + index int + wantPodGenerateName string + }{ + "short job name": { + jobname: "indexed-job", + index: 1, + wantPodGenerateName: "indexed-job-1-", + }, + "job name exceeds MaxGeneneratedNameLength": { + jobname: "hhhhhooooohhhhhooooohhhhhooooohhhhhooooohhhhhooooohhhhhooooohhhhhooooo", + index: 1, + wantPodGenerateName: "hhhhhooooohhhhhooooohhhhhooooohhhhhooooohhhhhooooohhhhh-1-", + }, + "job name with index suffix exceeds MaxGeneratedNameLength": { + jobname: "hhhhhooooohhhhhooooohhhhhooooohhhhhooooohhhhhooooohhhhhoo", + index: 1, + wantPodGenerateName: "hhhhhooooohhhhhooooohhhhhooooohhhhhooooohhhhhooooohhhhh-1-", + }, + } + for name, tc := range cases { + t.Run(name, func(t *testing.T) { + podGenerateName := podGenerateNameWithIndex(tc.jobname, tc.index) + if diff := cmp.Equal(tc.wantPodGenerateName, podGenerateName); !diff { + t.Errorf("Got pod generateName %s, want %s", podGenerateName, tc.wantPodGenerateName) + } + }) + } +} + func hollowPodsWithIndexPhase(descs []indexPhase) []*v1.Pod { pods := make([]*v1.Pod, 0, len(descs)) for _, desc := range descs { diff --git a/pkg/controller/job/job_controller.go b/pkg/controller/job/job_controller.go index df2964c028c..6e0a684b5df 100644 --- a/pkg/controller/job/job_controller.go +++ b/pkg/controller/job/job_controller.go @@ -871,9 +871,11 @@ func (jm *Controller) manageJob(job *batch.Job, activePods []*v1.Pod, succeeded if completionIndex != unknownCompletionIndex { template = podTemplate.DeepCopy() addCompletionIndexAnnotation(template, completionIndex) + template.Spec.Hostname = fmt.Sprintf("%s-%d", job.Name, completionIndex) } defer wait.Done() - err := jm.podControl.CreatePodsWithControllerRef(job.Namespace, template, job, metav1.NewControllerRef(job, controllerKind)) + generateName := podGenerateNameWithIndex(job.Name, completionIndex) + err := jm.podControl.CreatePodsWithControllerRefAndGenerateName(job.Namespace, template, job, metav1.NewControllerRef(job, controllerKind), generateName) if err != nil { if apierrors.HasStatusCause(err, v1.NamespaceTerminatingCause) { // If the namespace is being torn down, we can safely ignore diff --git a/pkg/controller/job/job_controller_test.go b/pkg/controller/job/job_controller_test.go index 745c64c50b9..156a3972d19 100644 --- a/pkg/controller/job/job_controller_test.go +++ b/pkg/controller/job/job_controller_test.go @@ -151,6 +151,7 @@ func setPodsStatusesWithIndexes(podIndexer cache.Indexer, job *batch.Job, status p.Annotations = map[string]string{ batch.JobCompletionIndexAnnotation: s.Index, } + p.Spec.Hostname = fmt.Sprintf("%s-%s", job.Name, s.Index) } podIndexer.Add(p) } @@ -735,7 +736,7 @@ func TestControllerSyncJob(t *testing.T) { t.Errorf("Unexpected number of creates. Expected %d, saw %d\n", tc.expectedCreations, len(fakePodControl.Templates)) } if tc.completionMode == batch.IndexedCompletion { - checkCompletionIndexesInPods(t, &fakePodControl, tc.expectedCreatedIndexes) + checkIndexedJobPods(t, &fakePodControl, tc.expectedCreatedIndexes, job.Name) } if int32(len(fakePodControl.DeletePodName)) != tc.expectedDeletions { t.Errorf("Unexpected number of deletes. Expected %d, saw %d\n", tc.expectedDeletions, len(fakePodControl.DeletePodName)) @@ -806,7 +807,7 @@ func TestControllerSyncJob(t *testing.T) { } } -func checkCompletionIndexesInPods(t *testing.T, control *controller.FakePodControl, wantIndexes sets.Int) { +func checkIndexedJobPods(t *testing.T, control *controller.FakePodControl, wantIndexes sets.Int, jobName string) { t.Helper() gotIndexes := sets.NewInt() for _, p := range control.Templates { @@ -817,6 +818,10 @@ func checkCompletionIndexesInPods(t *testing.T, control *controller.FakePodContr } else { gotIndexes.Insert(ix) } + expectedName := fmt.Sprintf("%s-%d", jobName, ix) + if diff := cmp.Equal(expectedName, p.Spec.Hostname); !diff { + t.Errorf("Got pod hostname %s, want %s", p.Spec.Hostname, expectedName) + } } if diff := cmp.Diff(wantIndexes.List(), gotIndexes.List()); diff != "" { t.Errorf("Unexpected created completion indexes (-want,+got):\n%s", diff) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/names/generate.go b/staging/src/k8s.io/apiserver/pkg/storage/names/generate.go index aad9a07f9ad..f7fb4c9414a 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/names/generate.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/names/generate.go @@ -43,12 +43,12 @@ const ( // TODO: make this flexible for non-core resources with alternate naming rules. maxNameLength = 63 randomLength = 5 - maxGeneratedNameLength = maxNameLength - randomLength + MaxGeneratedNameLength = maxNameLength - randomLength ) func (simpleNameGenerator) GenerateName(base string) string { - if len(base) > maxGeneratedNameLength { - base = base[:maxGeneratedNameLength] + if len(base) > MaxGeneratedNameLength { + base = base[:MaxGeneratedNameLength] } return fmt.Sprintf("%s%s", base, utilrand.String(randomLength)) } diff --git a/test/e2e/apps/job.go b/test/e2e/apps/job.go index 2cc906022ae..e141f1cf958 100644 --- a/test/e2e/apps/job.go +++ b/test/e2e/apps/job.go @@ -153,7 +153,7 @@ var _ = SIGDescribe("Job", func() { Testcase: Ensure Pods of an Indexed Job get a unique index. Description: Create an Indexed Job, wait for completion, capture the output of the pods and verify that they contain the completion index. */ - ginkgo.It("[Feature:IndexedJob] should create pods for an Indexed job with completion indexes", func() { + ginkgo.It("[Feature:IndexedJob] should create pods for an Indexed job with completion indexes and specified hostname", func() { ginkgo.By("Creating Indexed job") job := e2ejob.NewTestJob("succeed", "indexed-job", v1.RestartPolicyNever, parallelism, completions, nil, backoffLimit) mode := batchv1.IndexedCompletion @@ -174,11 +174,13 @@ var _ = SIGDescribe("Job", func() { ix, err := strconv.Atoi(pod.Annotations[batchv1.JobCompletionIndexAnnotation]) framework.ExpectNoError(err, "failed obtaining completion index from pod in namespace: %s", f.Namespace.Name) succeededIndexes.Insert(ix) + expectedName := fmt.Sprintf("%s-%d", job.Name, ix) + framework.ExpectEqual(pod.Spec.Hostname, expectedName, "expected completed pod with hostname %s, but got %s", expectedName, pod.Spec.Hostname) } } gotIndexes := succeededIndexes.List() wantIndexes := []int{0, 1, 2, 3} - framework.ExpectEqual(gotIndexes, wantIndexes, "expected completed indexes %s, but got %s", gotIndexes, wantIndexes) + framework.ExpectEqual(gotIndexes, wantIndexes, "expected completed indexes %s, but got %s", wantIndexes, gotIndexes) }) /* diff --git a/test/integration/job/job_test.go b/test/integration/job/job_test.go index 69c710d11b3..75cc705cd27 100644 --- a/test/integration/job/job_test.go +++ b/test/integration/job/job_test.go @@ -214,7 +214,7 @@ func TestIndexedJob(t *testing.T) { validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ Active: 3, }) - validateJobPodsIndexes(ctx, t, clientSet, jobObj, sets.NewInt(0, 1, 2), "") + validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.NewInt(0, 1, 2), "") // One Pod succeeds. if err := setJobPhaseForIndex(ctx, clientSet, jobObj, v1.PodSucceeded, 1); err != nil { @@ -224,7 +224,7 @@ func TestIndexedJob(t *testing.T) { Active: 3, Succeeded: 1, }) - validateJobPodsIndexes(ctx, t, clientSet, jobObj, sets.NewInt(0, 2, 3), "1") + validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.NewInt(0, 2, 3), "1") // Disable feature gate and restart controller. defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.IndexedJob, false)() @@ -243,7 +243,7 @@ func TestIndexedJob(t *testing.T) { if err := waitForEvent(events, jobObj.UID, "IndexedJobDisabled"); err != nil { t.Errorf("Waiting for an event for IndexedJobDisabled: %v", err) } - validateJobPodsIndexes(ctx, t, clientSet, jobObj, sets.NewInt(0, 3), "1") + validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.NewInt(0, 3), "1") // Re-enable feature gate and restart controller. Failed Pod should be recreated now. defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.IndexedJob, true)() @@ -255,7 +255,7 @@ func TestIndexedJob(t *testing.T) { Failed: 1, Succeeded: 1, }) - validateJobPodsIndexes(ctx, t, clientSet, jobObj, sets.NewInt(0, 2, 3), "1") + validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.NewInt(0, 2, 3), "1") // Remaining Pods succeed. if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 3); err != nil { @@ -266,7 +266,7 @@ func TestIndexedJob(t *testing.T) { Failed: 1, Succeeded: 4, }) - validateJobPodsIndexes(ctx, t, clientSet, jobObj, nil, "0-3") + validateIndexedJobPods(ctx, t, clientSet, jobObj, nil, "0-3") validateJobSucceeded(ctx, t, clientSet, jobObj) } @@ -439,9 +439,10 @@ func validateJobPodsStatus(ctx context.Context, t *testing.T, clientSet clientse } } -// validateJobPodsIndexes validates indexes of active and completed Pods of an -// Indexed Job. Call after validateJobPodsStatus -func validateJobPodsIndexes(ctx context.Context, t *testing.T, clientSet clientset.Interface, jobObj *batchv1.Job, wantActive sets.Int, gotCompleted string) { +// validateIndexedJobPods validates indexes and hostname of +// active and completed Pods of an Indexed Job. +// Call after validateJobPodsStatus +func validateIndexedJobPods(ctx context.Context, t *testing.T, clientSet clientset.Interface, jobObj *batchv1.Job, wantActive sets.Int, gotCompleted string) { t.Helper() updatedJob, err := clientSet.BatchV1().Jobs(jobObj.Namespace).Get(ctx, jobObj.Name, metav1.GetOptions{}) if err != nil { @@ -458,11 +459,16 @@ func validateJobPodsIndexes(ctx context.Context, t *testing.T, clientSet clients for _, pod := range pods.Items { if isPodOwnedByJob(&pod, jobObj) { if pod.Status.Phase == v1.PodPending || pod.Status.Phase == v1.PodRunning { - if ix, err := getCompletionIndex(&pod); err != nil { + ix, err := getCompletionIndex(&pod) + if err != nil { t.Errorf("Failed getting completion index for pod %s: %v", pod.Name, err) } else { gotActive.Insert(ix) } + expectedName := fmt.Sprintf("%s-%d", jobObj.Name, ix) + if diff := cmp.Equal(expectedName, pod.Spec.Hostname); !diff { + t.Errorf("Got pod hostname %s, want %s", pod.Spec.Hostname, expectedName) + } } } }