From 1341e35a96170986b082f9a8238188770e08e5cb Mon Sep 17 00:00:00 2001 From: Adhityaa Chandrasekar Date: Fri, 12 Mar 2021 14:14:05 +0000 Subject: [PATCH] add integration test for suspended jobs Signed-off-by: Adhityaa Chandrasekar --- test/integration/job/job_test.go | 155 +++++++++++++++++++++++++++++-- 1 file changed, 147 insertions(+), 8 deletions(-) diff --git a/test/integration/job/job_test.go b/test/integration/job/job_test.go index b77e6d38094..d1341254741 100644 --- a/test/integration/job/job_test.go +++ b/test/integration/job/job_test.go @@ -38,7 +38,7 @@ import ( clientset "k8s.io/client-go/kubernetes" restclient "k8s.io/client-go/rest" featuregatetesting "k8s.io/component-base/featuregate/testing" - "k8s.io/kubernetes/pkg/controller/job" + jobcontroller "k8s.io/kubernetes/pkg/controller/job" "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/test/integration/framework" "k8s.io/utils/pointer" @@ -270,6 +270,138 @@ func TestIndexedJob(t *testing.T) { validateJobSucceeded(ctx, t, clientSet, jobObj) } +func TestSuspendJob(t *testing.T) { + type step struct { + flag bool + wantActive int + wantStatus v1.ConditionStatus + wantReason string + } + testCases := []struct { + short bool + featureGate bool + create step + update step + }{ + // Exhaustively test all combinations other than trivial true->true and + // false->false cases. + { + short: true, + featureGate: true, + create: step{flag: false, wantActive: 2}, + update: step{flag: true, wantActive: 0, wantStatus: v1.ConditionTrue, wantReason: "Suspended"}, + }, + { + featureGate: true, + create: step{flag: true, wantActive: 0, wantStatus: v1.ConditionTrue, wantReason: "Suspended"}, + update: step{flag: false, wantActive: 2, wantStatus: v1.ConditionFalse, wantReason: "Resumed"}, + }, + { + featureGate: false, + create: step{flag: false, wantActive: 2}, + update: step{flag: true, wantActive: 2}, + }, + { + featureGate: false, + create: step{flag: true, wantActive: 2}, + update: step{flag: false, wantActive: 2}, + }, + } + + for _, tc := range testCases { + name := fmt.Sprintf("feature=%v,create=%v,update=%v", tc.featureGate, tc.create.flag, tc.update.flag) + t.Run(name, func(t *testing.T) { + if testing.Short() && !tc.short { + t.Skip("skipping expensive subtest") + } + defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.SuspendJob, tc.featureGate)() + + closeFn, restConfig, clientSet, ns := setup(t, "suspend") + defer closeFn() + ctx, cancel := startJobController(restConfig, clientSet) + defer cancel() + events, err := clientSet.EventsV1().Events(ns.Name).Watch(ctx, metav1.ListOptions{}) + if err != nil { + t.Fatal(err) + } + defer events.Stop() + + parallelism := int32(2) + job, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{ + Spec: batchv1.JobSpec{ + Parallelism: pointer.Int32Ptr(parallelism), + Completions: pointer.Int32Ptr(4), + Suspend: pointer.BoolPtr(tc.create.flag), + }, + }) + if err != nil { + t.Fatalf("Failed to create Job: %v", err) + } + + validate := func(s string, active int, status v1.ConditionStatus, reason string) { + validateJobPodsStatus(ctx, t, clientSet, job, podsByStatus{ + Active: active, + }) + job, err = clientSet.BatchV1().Jobs(ns.Name).Get(ctx, job.Name, metav1.GetOptions{}) + if err != nil { + t.Fatalf("Failed to get Job after %s: %v", s, err) + } + if got, want := getJobConditionStatus(ctx, job, batchv1.JobSuspended), status; got != want { + t.Errorf("Unexpected Job condition %q status after %s: got %q, want %q", batchv1.JobSuspended, s, got, want) + } + if err := waitForEvent(events, job.UID, reason); err != nil { + t.Errorf("Waiting for event with reason %q after %s: %v", reason, s, err) + } + } + validate("create", tc.create.wantActive, tc.create.wantStatus, tc.create.wantReason) + + job.Spec.Suspend = pointer.BoolPtr(tc.update.flag) + job, err = clientSet.BatchV1().Jobs(ns.Name).Update(ctx, job, metav1.UpdateOptions{}) + if err != nil { + t.Fatalf("Failed to update Job: %v", err) + } + validate("update", tc.update.wantActive, tc.update.wantStatus, tc.update.wantReason) + }) + } +} + +func TestSuspendJobControllerRestart(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.SuspendJob, true)() + + closeFn, restConfig, clientSet, ns := setup(t, "suspend") + defer closeFn() + ctx, cancel := startJobController(restConfig, clientSet) + defer func() { + cancel() + }() + + job, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{ + Spec: batchv1.JobSpec{ + Parallelism: pointer.Int32Ptr(2), + Completions: pointer.Int32Ptr(4), + Suspend: pointer.BoolPtr(true), + }, + }) + if err != nil { + t.Fatalf("Failed to create Job: %v", err) + } + validateJobPodsStatus(ctx, t, clientSet, job, podsByStatus{ + Active: 0, + }) + + // Disable feature gate and restart controller to test that pods get created. + defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.SuspendJob, false)() + cancel() + ctx, cancel = startJobController(restConfig, clientSet) + job, err = clientSet.BatchV1().Jobs(ns.Name).Get(ctx, job.Name, metav1.GetOptions{}) + if err != nil { + t.Fatalf("Failed to get Job: %v", err) + } + validateJobPodsStatus(ctx, t, clientSet, job, podsByStatus{ + Active: 2, + }) +} + type podsByStatus struct { Active int Failed int @@ -348,6 +480,9 @@ func validateJobPodsIndexes(ctx context.Context, t *testing.T, clientSet clients } func waitForEvent(events watch.Interface, uid types.UID, reason string) error { + if reason == "" { + return nil + } return wait.Poll(time.Second, wait.ForeverTestTimeout, func() (bool, error) { for { var ev watch.Event @@ -368,6 +503,15 @@ func waitForEvent(events watch.Interface, uid types.UID, reason string) error { }) } +func getJobConditionStatus(ctx context.Context, job *batchv1.Job, cType batchv1.JobConditionType) v1.ConditionStatus { + for _, cond := range job.Status.Conditions { + if cond.Type == cType { + return cond.Status + } + } + return "" +} + func validateJobSucceeded(ctx context.Context, t *testing.T, clientSet clientset.Interface, jobObj *batchv1.Job) { t.Helper() if err := wait.Poll(time.Second, wait.ForeverTestTimeout, func() (bool, error) { @@ -375,12 +519,7 @@ func validateJobSucceeded(ctx context.Context, t *testing.T, clientSet clientset if err != nil { t.Fatalf("Failed to obtain updated Job: %v", err) } - for _, cond := range j.Status.Conditions { - if cond.Type == batchv1.JobComplete && cond.Status == v1.ConditionTrue { - return true, nil - } - } - return false, nil + return getJobConditionStatus(ctx, j, batchv1.JobComplete) == v1.ConditionTrue, nil }); err != nil { t.Errorf("Waiting for Job to succeed: %v", err) } @@ -487,7 +626,7 @@ func startJobController(restConfig *restclient.Config, clientSet clientset.Inter ctx, cancel := context.WithCancel(context.Background()) resyncPeriod := 12 * time.Hour informerSet := informers.NewSharedInformerFactory(clientset.NewForConfigOrDie(restclient.AddUserAgent(restConfig, "cronjob-informers")), resyncPeriod) - jc := job.NewController(informerSet.Core().V1().Pods(), informerSet.Batch().V1().Jobs(), clientSet) + jc := jobcontroller.NewController(informerSet.Core().V1().Pods(), informerSet.Batch().V1().Jobs(), clientSet) informerSet.Start(ctx.Done()) go jc.Run(1, ctx.Done()) return ctx, cancel