Merge pull request #100295 from adtac/suspend-int

add integration test for suspended jobs
This commit is contained in:
Kubernetes Prow Robot 2021-03-24 13:49:30 -07:00 committed by GitHub
commit e57223876d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -38,7 +38,7 @@ import (
clientset "k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/kubernetes/pkg/controller/job"
jobcontroller "k8s.io/kubernetes/pkg/controller/job"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/test/integration/framework"
"k8s.io/utils/pointer"
@ -270,6 +270,138 @@ func TestIndexedJob(t *testing.T) {
validateJobSucceeded(ctx, t, clientSet, jobObj)
}
func TestSuspendJob(t *testing.T) {
type step struct {
flag bool
wantActive int
wantStatus v1.ConditionStatus
wantReason string
}
testCases := []struct {
short bool
featureGate bool
create step
update step
}{
// Exhaustively test all combinations other than trivial true->true and
// false->false cases.
{
short: true,
featureGate: true,
create: step{flag: false, wantActive: 2},
update: step{flag: true, wantActive: 0, wantStatus: v1.ConditionTrue, wantReason: "Suspended"},
},
{
featureGate: true,
create: step{flag: true, wantActive: 0, wantStatus: v1.ConditionTrue, wantReason: "Suspended"},
update: step{flag: false, wantActive: 2, wantStatus: v1.ConditionFalse, wantReason: "Resumed"},
},
{
featureGate: false,
create: step{flag: false, wantActive: 2},
update: step{flag: true, wantActive: 2},
},
{
featureGate: false,
create: step{flag: true, wantActive: 2},
update: step{flag: false, wantActive: 2},
},
}
for _, tc := range testCases {
name := fmt.Sprintf("feature=%v,create=%v,update=%v", tc.featureGate, tc.create.flag, tc.update.flag)
t.Run(name, func(t *testing.T) {
if testing.Short() && !tc.short {
t.Skip("skipping expensive subtest")
}
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.SuspendJob, tc.featureGate)()
closeFn, restConfig, clientSet, ns := setup(t, "suspend")
defer closeFn()
ctx, cancel := startJobController(restConfig, clientSet)
defer cancel()
events, err := clientSet.EventsV1().Events(ns.Name).Watch(ctx, metav1.ListOptions{})
if err != nil {
t.Fatal(err)
}
defer events.Stop()
parallelism := int32(2)
job, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{
Spec: batchv1.JobSpec{
Parallelism: pointer.Int32Ptr(parallelism),
Completions: pointer.Int32Ptr(4),
Suspend: pointer.BoolPtr(tc.create.flag),
},
})
if err != nil {
t.Fatalf("Failed to create Job: %v", err)
}
validate := func(s string, active int, status v1.ConditionStatus, reason string) {
validateJobPodsStatus(ctx, t, clientSet, job, podsByStatus{
Active: active,
})
job, err = clientSet.BatchV1().Jobs(ns.Name).Get(ctx, job.Name, metav1.GetOptions{})
if err != nil {
t.Fatalf("Failed to get Job after %s: %v", s, err)
}
if got, want := getJobConditionStatus(ctx, job, batchv1.JobSuspended), status; got != want {
t.Errorf("Unexpected Job condition %q status after %s: got %q, want %q", batchv1.JobSuspended, s, got, want)
}
if err := waitForEvent(events, job.UID, reason); err != nil {
t.Errorf("Waiting for event with reason %q after %s: %v", reason, s, err)
}
}
validate("create", tc.create.wantActive, tc.create.wantStatus, tc.create.wantReason)
job.Spec.Suspend = pointer.BoolPtr(tc.update.flag)
job, err = clientSet.BatchV1().Jobs(ns.Name).Update(ctx, job, metav1.UpdateOptions{})
if err != nil {
t.Fatalf("Failed to update Job: %v", err)
}
validate("update", tc.update.wantActive, tc.update.wantStatus, tc.update.wantReason)
})
}
}
func TestSuspendJobControllerRestart(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.SuspendJob, true)()
closeFn, restConfig, clientSet, ns := setup(t, "suspend")
defer closeFn()
ctx, cancel := startJobController(restConfig, clientSet)
defer func() {
cancel()
}()
job, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{
Spec: batchv1.JobSpec{
Parallelism: pointer.Int32Ptr(2),
Completions: pointer.Int32Ptr(4),
Suspend: pointer.BoolPtr(true),
},
})
if err != nil {
t.Fatalf("Failed to create Job: %v", err)
}
validateJobPodsStatus(ctx, t, clientSet, job, podsByStatus{
Active: 0,
})
// Disable feature gate and restart controller to test that pods get created.
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.SuspendJob, false)()
cancel()
ctx, cancel = startJobController(restConfig, clientSet)
job, err = clientSet.BatchV1().Jobs(ns.Name).Get(ctx, job.Name, metav1.GetOptions{})
if err != nil {
t.Fatalf("Failed to get Job: %v", err)
}
validateJobPodsStatus(ctx, t, clientSet, job, podsByStatus{
Active: 2,
})
}
type podsByStatus struct {
Active int
Failed int
@ -348,6 +480,9 @@ func validateJobPodsIndexes(ctx context.Context, t *testing.T, clientSet clients
}
func waitForEvent(events watch.Interface, uid types.UID, reason string) error {
if reason == "" {
return nil
}
return wait.Poll(time.Second, wait.ForeverTestTimeout, func() (bool, error) {
for {
var ev watch.Event
@ -368,6 +503,15 @@ func waitForEvent(events watch.Interface, uid types.UID, reason string) error {
})
}
func getJobConditionStatus(ctx context.Context, job *batchv1.Job, cType batchv1.JobConditionType) v1.ConditionStatus {
for _, cond := range job.Status.Conditions {
if cond.Type == cType {
return cond.Status
}
}
return ""
}
func validateJobSucceeded(ctx context.Context, t *testing.T, clientSet clientset.Interface, jobObj *batchv1.Job) {
t.Helper()
if err := wait.Poll(time.Second, wait.ForeverTestTimeout, func() (bool, error) {
@ -375,12 +519,7 @@ func validateJobSucceeded(ctx context.Context, t *testing.T, clientSet clientset
if err != nil {
t.Fatalf("Failed to obtain updated Job: %v", err)
}
for _, cond := range j.Status.Conditions {
if cond.Type == batchv1.JobComplete && cond.Status == v1.ConditionTrue {
return true, nil
}
}
return false, nil
return getJobConditionStatus(ctx, j, batchv1.JobComplete) == v1.ConditionTrue, nil
}); err != nil {
t.Errorf("Waiting for Job to succeed: %v", err)
}
@ -487,7 +626,7 @@ func startJobController(restConfig *restclient.Config, clientSet clientset.Inter
ctx, cancel := context.WithCancel(context.Background())
resyncPeriod := 12 * time.Hour
informerSet := informers.NewSharedInformerFactory(clientset.NewForConfigOrDie(restclient.AddUserAgent(restConfig, "cronjob-informers")), resyncPeriod)
jc := job.NewController(informerSet.Core().V1().Pods(), informerSet.Batch().V1().Jobs(), clientSet)
jc := jobcontroller.NewController(informerSet.Core().V1().Pods(), informerSet.Batch().V1().Jobs(), clientSet)
informerSet.Start(ctx.Done())
go jc.Run(1, ctx.Done())
return ctx, cancel