From a1f6b7b7ec3713af7b2e0158baf6b00dae58b39c Mon Sep 17 00:00:00 2001 From: Stephen Heywood Date: Fri, 25 Mar 2022 12:05:56 +1300 Subject: [PATCH] Create job lifecycle e2e test The test validates the following endpoints - deleteBatchV1CollectionNamespacedJob - listBatchV1JobForAllNamespaces - patchBatchV1NamespacedJob - replaceBatchV1NamespacedJob --- test/e2e/apps/job.go | 173 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 173 insertions(+) diff --git a/test/e2e/apps/job.go b/test/e2e/apps/job.go index a0bfbeacc27..a462522ab7d 100644 --- a/test/e2e/apps/job.go +++ b/test/e2e/apps/job.go @@ -29,11 +29,16 @@ import ( "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" + utilrand "k8s.io/apimachinery/pkg/util/rand" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apimachinery/pkg/watch" clientset "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" + watchtools "k8s.io/client-go/tools/watch" "k8s.io/client-go/util/retry" batchinternal "k8s.io/kubernetes/pkg/apis/batch" "k8s.io/kubernetes/test/e2e/framework" @@ -48,6 +53,18 @@ import ( "github.com/onsi/gomega" ) +type watchEventConfig struct { + framework *framework.Framework + resourceVersion string + w *cache.ListWatch + jobName string + watchEvent watch.EventType + extJob *batchv1.Job + updatedMetadataType string + updatedKey string + updatedValue string +} + var _ = SIGDescribe("Job", func() { f := framework.NewDefaultFramework("job") parallelism := int32(2) @@ -477,8 +494,164 @@ var _ = SIGDescribe("Job", func() { framework.ExpectEqual(string(job.UID), statusUID, fmt.Sprintf("job.UID: %v expected to match statusUID: %v ", job.UID, statusUID)) }) + /* + Description: Attempt to create a suspended Job which MUST succeed. + Attempt to patch the Job to include a new label which MUST succeed. + The label MUST be found. Attempt to replace the Job to include a + new annotation which MUST succeed. The annotation MUST be found. + Attempt to list all namespaces with a label selector which MUST + succeed. One list MUST be found. It MUST succeed at deleting a + collection of jobs via a label selector. + */ + ginkgo.It("should manage the lifecycle of a job", func() { + jobName := "e2e-" + utilrand.String(5) + label := map[string]string{"e2e-job-label": jobName} + labelSelector := labels.SelectorFromSet(label).String() + + ns := f.Namespace.Name + jobClient := f.ClientSet.BatchV1().Jobs(ns) + + w := &cache.ListWatch{ + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + options.LabelSelector = labelSelector + return jobClient.Watch(context.TODO(), options) + }, + } + jobsList, err := jobClient.List(context.TODO(), metav1.ListOptions{LabelSelector: labelSelector}) + framework.ExpectNoError(err, "failed to list Job") + + ginkgo.By("Creating a suspended job") + job := e2ejob.NewTestJob("succeed", jobName, v1.RestartPolicyNever, parallelism, completions, nil, backoffLimit) + job.Labels = label + job.Spec.Suspend = pointer.BoolPtr(true) + job, err = e2ejob.CreateJob(f.ClientSet, ns, job) + framework.ExpectNoError(err, "failed to create job in namespace: %s", ns) + + ginkgo.By("Patching the Job") + payload := "{\"metadata\":{\"labels\":{\"" + jobName + "\":\"patched\"}}}" + patchedJob, err := f.ClientSet.BatchV1().Jobs(ns).Patch(context.TODO(), jobName, types.StrategicMergePatchType, []byte(payload), metav1.PatchOptions{}) + framework.ExpectNoError(err, "failed to patch Job %s in namespace %s", jobName, ns) + + ginkgo.By("Watching for Job to be patched") + c := watchEventConfig{ + framework: f, + resourceVersion: jobsList.ResourceVersion, + w: w, + jobName: jobName, + watchEvent: watch.Modified, + extJob: patchedJob, + updatedMetadataType: "label", + updatedKey: jobName, + updatedValue: "patched", + } + waitForJobEvent(c) + framework.ExpectEqual(patchedJob.Labels[jobName], "patched", "Did not find job label for this job. Current labels: %v", patchedJob.Labels) + + ginkgo.By("Updating the job") + var updatedJob *batchv1.Job + + err = retry.RetryOnConflict(retry.DefaultRetry, func() error { + patchedJob, err = jobClient.Get(context.TODO(), jobName, metav1.GetOptions{}) + framework.ExpectNoError(err, "Unable to get job %s", jobName) + patchedJob.Spec.Suspend = pointer.BoolPtr(false) + patchedJob.Annotations["updated"] = "true" + updatedJob, err = e2ejob.UpdateJob(f.ClientSet, ns, patchedJob) + return err + }) + framework.ExpectNoError(err, "failed to update job in namespace: %s", ns) + + ginkgo.By("Watching for Job to be updated") + c = watchEventConfig{ + framework: f, + resourceVersion: patchedJob.ResourceVersion, + w: w, + jobName: jobName, + watchEvent: watch.Modified, + extJob: updatedJob, + updatedMetadataType: "annotation", + updatedKey: "updated", + updatedValue: "true", + } + waitForJobEvent(c) + framework.ExpectEqual(updatedJob.Annotations["updated"], "true", "updated Job should have the applied annotation") + framework.Logf("Found Job annotations: %#v", patchedJob.Annotations) + + ginkgo.By("Listing all Jobs with LabelSelector") + jobs, err := f.ClientSet.BatchV1().Jobs("").List(context.TODO(), metav1.ListOptions{LabelSelector: labelSelector}) + framework.ExpectNoError(err, "Failed to list job. %v", err) + framework.ExpectEqual(len(jobs.Items), 1, "Failed to find job %v", jobName) + testJob := jobs.Items[0] + framework.Logf("Job: %v as labels: %v", testJob.Name, testJob.Labels) + + ginkgo.By("Waiting for job to complete") + err = e2ejob.WaitForJobComplete(f.ClientSet, ns, jobName, completions) + framework.ExpectNoError(err, "failed to ensure job completion in namespace: %s", ns) + + ginkgo.By("Delete a job collection with a labelselector") + propagationPolicy := metav1.DeletePropagationBackground + err = f.ClientSet.BatchV1().Jobs(ns).DeleteCollection(context.TODO(), metav1.DeleteOptions{PropagationPolicy: &propagationPolicy}, metav1.ListOptions{LabelSelector: labelSelector}) + framework.ExpectNoError(err, "failed to delete job %s in namespace: %s", job.Name, ns) + + ginkgo.By("Watching for Job to be deleted") + c = watchEventConfig{ + framework: f, + resourceVersion: updatedJob.ResourceVersion, + w: w, + jobName: jobName, + watchEvent: watch.Deleted, + extJob: &testJob, + updatedMetadataType: "label", + updatedKey: "e2e-job-label", + updatedValue: jobName, + } + waitForJobEvent(c) + + ginkgo.By("Relist jobs to confirm deletion") + jobs, err = f.ClientSet.BatchV1().Jobs("").List(context.TODO(), metav1.ListOptions{LabelSelector: labelSelector}) + framework.ExpectNoError(err, "Failed to list job. %v", err) + framework.ExpectEqual(len(jobs.Items), 0, "Found job %v", jobName) + }) + }) +// waitForJobEvent is used to track and log Job events. +// As delivery of events is not actually guaranteed we +// will not return an error if we miss the required event. +func waitForJobEvent(config watchEventConfig) { + f := config.framework + ctx, cancel := context.WithTimeout(context.Background(), f.Timeouts.PodStartShort) + defer cancel() + _, err := watchtools.Until(ctx, config.resourceVersion, config.w, func(event watch.Event) (bool, error) { + if job, ok := event.Object.(*batchv1.Job); ok { + + var key string + switch config.updatedMetadataType { + case "annotation": + key = job.Annotations[config.updatedKey] + case "label": + key = job.Labels[config.updatedKey] + } + + found := job.ObjectMeta.Name == config.extJob.ObjectMeta.Name && + job.ObjectMeta.Namespace == f.Namespace.Name && + key == config.updatedValue && + event.Type == config.watchEvent + if !found { + framework.Logf("Event %v observed for Job %v in namespace %v with labels: %v and annotations: %v", event.Type, job.ObjectMeta.Name, job.ObjectMeta.Namespace, job.Labels, job.Annotations) + return false, nil + } + framework.Logf("Event %v found for Job %v in namespace %v with labels: %v and annotations: %v", event.Type, job.ObjectMeta.Name, job.ObjectMeta.Namespace, job.Labels, job.Annotations) + return found, nil + } + framework.Logf("Observed event: %+v", event.Object) + return false, nil + }) + if err != nil { + j, _ := f.ClientSet.BatchV1().Jobs(f.Namespace.Name).Get(context.TODO(), config.jobName, metav1.GetOptions{}) + framework.Logf("We missed the %v event. Job details: %+v", config.watchEvent, j) + } +} + // waitForJobFailure uses c to wait for up to timeout for the Job named jobName in namespace ns to fail. func waitForJobFailure(c clientset.Interface, ns, jobName string, timeout time.Duration, reason string) error { return wait.Poll(framework.Poll, timeout, func() (bool, error) {