Create job lifecycle e2e test

The test validates the following endpoints
- deleteBatchV1CollectionNamespacedJob
- listBatchV1JobForAllNamespaces
- patchBatchV1NamespacedJob
- replaceBatchV1NamespacedJob
This commit is contained in:
Stephen Heywood 2022-03-25 12:05:56 +13:00
parent 11b3a18cca
commit a1f6b7b7ec

View File

@ -29,11 +29,16 @@ import (
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
utilrand "k8s.io/apimachinery/pkg/util/rand"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
watchtools "k8s.io/client-go/tools/watch"
"k8s.io/client-go/util/retry"
batchinternal "k8s.io/kubernetes/pkg/apis/batch"
"k8s.io/kubernetes/test/e2e/framework"
@ -48,6 +53,18 @@ import (
"github.com/onsi/gomega"
)
type watchEventConfig struct {
framework *framework.Framework
resourceVersion string
w *cache.ListWatch
jobName string
watchEvent watch.EventType
extJob *batchv1.Job
updatedMetadataType string
updatedKey string
updatedValue string
}
var _ = SIGDescribe("Job", func() {
f := framework.NewDefaultFramework("job")
parallelism := int32(2)
@ -477,7 +494,163 @@ var _ = SIGDescribe("Job", func() {
framework.ExpectEqual(string(job.UID), statusUID, fmt.Sprintf("job.UID: %v expected to match statusUID: %v ", job.UID, statusUID))
})
/*
Description: Attempt to create a suspended Job which MUST succeed.
Attempt to patch the Job to include a new label which MUST succeed.
The label MUST be found. Attempt to replace the Job to include a
new annotation which MUST succeed. The annotation MUST be found.
Attempt to list all namespaces with a label selector which MUST
succeed. One list MUST be found. It MUST succeed at deleting a
collection of jobs via a label selector.
*/
ginkgo.It("should manage the lifecycle of a job", func() {
jobName := "e2e-" + utilrand.String(5)
label := map[string]string{"e2e-job-label": jobName}
labelSelector := labels.SelectorFromSet(label).String()
ns := f.Namespace.Name
jobClient := f.ClientSet.BatchV1().Jobs(ns)
w := &cache.ListWatch{
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
options.LabelSelector = labelSelector
return jobClient.Watch(context.TODO(), options)
},
}
jobsList, err := jobClient.List(context.TODO(), metav1.ListOptions{LabelSelector: labelSelector})
framework.ExpectNoError(err, "failed to list Job")
ginkgo.By("Creating a suspended job")
job := e2ejob.NewTestJob("succeed", jobName, v1.RestartPolicyNever, parallelism, completions, nil, backoffLimit)
job.Labels = label
job.Spec.Suspend = pointer.BoolPtr(true)
job, err = e2ejob.CreateJob(f.ClientSet, ns, job)
framework.ExpectNoError(err, "failed to create job in namespace: %s", ns)
ginkgo.By("Patching the Job")
payload := "{\"metadata\":{\"labels\":{\"" + jobName + "\":\"patched\"}}}"
patchedJob, err := f.ClientSet.BatchV1().Jobs(ns).Patch(context.TODO(), jobName, types.StrategicMergePatchType, []byte(payload), metav1.PatchOptions{})
framework.ExpectNoError(err, "failed to patch Job %s in namespace %s", jobName, ns)
ginkgo.By("Watching for Job to be patched")
c := watchEventConfig{
framework: f,
resourceVersion: jobsList.ResourceVersion,
w: w,
jobName: jobName,
watchEvent: watch.Modified,
extJob: patchedJob,
updatedMetadataType: "label",
updatedKey: jobName,
updatedValue: "patched",
}
waitForJobEvent(c)
framework.ExpectEqual(patchedJob.Labels[jobName], "patched", "Did not find job label for this job. Current labels: %v", patchedJob.Labels)
ginkgo.By("Updating the job")
var updatedJob *batchv1.Job
err = retry.RetryOnConflict(retry.DefaultRetry, func() error {
patchedJob, err = jobClient.Get(context.TODO(), jobName, metav1.GetOptions{})
framework.ExpectNoError(err, "Unable to get job %s", jobName)
patchedJob.Spec.Suspend = pointer.BoolPtr(false)
patchedJob.Annotations["updated"] = "true"
updatedJob, err = e2ejob.UpdateJob(f.ClientSet, ns, patchedJob)
return err
})
framework.ExpectNoError(err, "failed to update job in namespace: %s", ns)
ginkgo.By("Watching for Job to be updated")
c = watchEventConfig{
framework: f,
resourceVersion: patchedJob.ResourceVersion,
w: w,
jobName: jobName,
watchEvent: watch.Modified,
extJob: updatedJob,
updatedMetadataType: "annotation",
updatedKey: "updated",
updatedValue: "true",
}
waitForJobEvent(c)
framework.ExpectEqual(updatedJob.Annotations["updated"], "true", "updated Job should have the applied annotation")
framework.Logf("Found Job annotations: %#v", patchedJob.Annotations)
ginkgo.By("Listing all Jobs with LabelSelector")
jobs, err := f.ClientSet.BatchV1().Jobs("").List(context.TODO(), metav1.ListOptions{LabelSelector: labelSelector})
framework.ExpectNoError(err, "Failed to list job. %v", err)
framework.ExpectEqual(len(jobs.Items), 1, "Failed to find job %v", jobName)
testJob := jobs.Items[0]
framework.Logf("Job: %v as labels: %v", testJob.Name, testJob.Labels)
ginkgo.By("Waiting for job to complete")
err = e2ejob.WaitForJobComplete(f.ClientSet, ns, jobName, completions)
framework.ExpectNoError(err, "failed to ensure job completion in namespace: %s", ns)
ginkgo.By("Delete a job collection with a labelselector")
propagationPolicy := metav1.DeletePropagationBackground
err = f.ClientSet.BatchV1().Jobs(ns).DeleteCollection(context.TODO(), metav1.DeleteOptions{PropagationPolicy: &propagationPolicy}, metav1.ListOptions{LabelSelector: labelSelector})
framework.ExpectNoError(err, "failed to delete job %s in namespace: %s", job.Name, ns)
ginkgo.By("Watching for Job to be deleted")
c = watchEventConfig{
framework: f,
resourceVersion: updatedJob.ResourceVersion,
w: w,
jobName: jobName,
watchEvent: watch.Deleted,
extJob: &testJob,
updatedMetadataType: "label",
updatedKey: "e2e-job-label",
updatedValue: jobName,
}
waitForJobEvent(c)
ginkgo.By("Relist jobs to confirm deletion")
jobs, err = f.ClientSet.BatchV1().Jobs("").List(context.TODO(), metav1.ListOptions{LabelSelector: labelSelector})
framework.ExpectNoError(err, "Failed to list job. %v", err)
framework.ExpectEqual(len(jobs.Items), 0, "Found job %v", jobName)
})
})
// waitForJobEvent is used to track and log Job events.
// As delivery of events is not actually guaranteed we
// will not return an error if we miss the required event.
func waitForJobEvent(config watchEventConfig) {
f := config.framework
ctx, cancel := context.WithTimeout(context.Background(), f.Timeouts.PodStartShort)
defer cancel()
_, err := watchtools.Until(ctx, config.resourceVersion, config.w, func(event watch.Event) (bool, error) {
if job, ok := event.Object.(*batchv1.Job); ok {
var key string
switch config.updatedMetadataType {
case "annotation":
key = job.Annotations[config.updatedKey]
case "label":
key = job.Labels[config.updatedKey]
}
found := job.ObjectMeta.Name == config.extJob.ObjectMeta.Name &&
job.ObjectMeta.Namespace == f.Namespace.Name &&
key == config.updatedValue &&
event.Type == config.watchEvent
if !found {
framework.Logf("Event %v observed for Job %v in namespace %v with labels: %v and annotations: %v", event.Type, job.ObjectMeta.Name, job.ObjectMeta.Namespace, job.Labels, job.Annotations)
return false, nil
}
framework.Logf("Event %v found for Job %v in namespace %v with labels: %v and annotations: %v", event.Type, job.ObjectMeta.Name, job.ObjectMeta.Namespace, job.Labels, job.Annotations)
return found, nil
}
framework.Logf("Observed event: %+v", event.Object)
return false, nil
})
if err != nil {
j, _ := f.ClientSet.BatchV1().Jobs(f.Namespace.Name).Get(context.TODO(), config.jobName, metav1.GetOptions{})
framework.Logf("We missed the %v event. Job details: %+v", config.watchEvent, j)
}
}
// waitForJobFailure uses c to wait for up to timeout for the Job named jobName in namespace ns to fail.
func waitForJobFailure(c clientset.Interface, ns, jobName string, timeout time.Duration, reason string) error {