From c9e78f1cd5eefd064e4943811871b0839a5534c6 Mon Sep 17 00:00:00 2001 From: gmarek Date: Thu, 8 Dec 2016 15:13:45 +0100 Subject: [PATCH] Add an option to run Job in Density/Load config --- test/e2e/density.go | 10 ++++-- test/e2e/framework/BUILD | 1 + test/e2e/framework/util.go | 40 +++++++++++------------ test/e2e/load.go | 3 ++ test/utils/BUILD | 2 ++ test/utils/runners.go | 66 ++++++++++++++++++++++++++++++++++++++ 6 files changed, 98 insertions(+), 24 deletions(-) diff --git a/test/e2e/density.go b/test/e2e/density.go index be3b61ef5b5..36c6463d563 100644 --- a/test/e2e/density.go +++ b/test/e2e/density.go @@ -28,6 +28,7 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/resource" "k8s.io/kubernetes/pkg/api/v1" + "k8s.io/kubernetes/pkg/apis/batch" "k8s.io/kubernetes/pkg/apis/extensions" metav1 "k8s.io/kubernetes/pkg/apis/meta/v1" "k8s.io/kubernetes/pkg/client/cache" @@ -257,8 +258,7 @@ func cleanupDensityTest(dtc DensityTestConfig) { name := dtc.Configs[i].GetName() namespace := dtc.Configs[i].GetNamespace() kind := dtc.Configs[i].GetKind() - // TODO: Remove Deployment guard once GC is implemented for Deployments. - if framework.TestContext.GarbageCollectorEnabled && kind != extensions.Kind("Deployment") { + if framework.TestContext.GarbageCollectorEnabled && kindSupportsGarbageCollector(kind) { By(fmt.Sprintf("Cleaning up only the %v, garbage collector will clean up the pods", kind)) err := framework.DeleteResourceAndWaitForGC(dtc.ClientSet, kind, namespace, name) framework.ExpectNoError(err) @@ -480,6 +480,8 @@ var _ = framework.KubeDescribe("Density", func() { configs[i] = &testutils.ReplicaSetConfig{RCConfig: *baseConfig} case extensions.Kind("Deployment"): configs[i] = &testutils.DeploymentConfig{RCConfig: *baseConfig} + case batch.Kind("Job"): + configs[i] = &testutils.JobConfig{RCConfig: *baseConfig} default: framework.Failf("Unsupported kind: %v", itArg.kind) } @@ -804,3 +806,7 @@ func createRunningPodFromRC(wg *sync.WaitGroup, c clientset.Interface, name, ns, framework.ExpectNoError(framework.WaitForControlledPodsRunning(c, ns, name, api.Kind("ReplicationController"))) framework.Logf("Found pod '%s' running", name) } + +func kindSupportsGarbageCollector(kind schema.GroupKind) bool { + return kind != extensions.Kind("Deployment") && kind != batch.Kind("Job") +} diff --git a/test/e2e/framework/BUILD b/test/e2e/framework/BUILD index 88398d997aa..6cccae9466b 100644 --- a/test/e2e/framework/BUILD +++ b/test/e2e/framework/BUILD @@ -35,6 +35,7 @@ go_library( "//pkg/api/validation:go_default_library", "//pkg/apimachinery/registered:go_default_library", "//pkg/apis/apps/v1beta1:go_default_library", + "//pkg/apis/batch:go_default_library", "//pkg/apis/batch/v1:go_default_library", "//pkg/apis/componentconfig:go_default_library", "//pkg/apis/extensions:go_default_library", diff --git a/test/e2e/framework/util.go b/test/e2e/framework/util.go index abfbeb63634..761c1347396 100644 --- a/test/e2e/framework/util.go +++ b/test/e2e/framework/util.go @@ -56,6 +56,7 @@ import ( "k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/apimachinery/registered" apps "k8s.io/kubernetes/pkg/apis/apps/v1beta1" + batchinternal "k8s.io/kubernetes/pkg/apis/batch" batch "k8s.io/kubernetes/pkg/apis/batch/v1" extensionsinternal "k8s.io/kubernetes/pkg/apis/extensions" extensions "k8s.io/kubernetes/pkg/apis/extensions/v1beta1" @@ -2642,16 +2643,7 @@ func RemoveTaintOffNode(c clientset.Interface, nodeName string, taint v1.Taint) } func getScalerForKind(internalClientset internalclientset.Interface, kind schema.GroupKind) (kubectl.Scaler, error) { - switch kind { - case api.Kind("ReplicationController"): - return kubectl.ScalerFor(api.Kind("ReplicationController"), internalClientset) - case extensionsinternal.Kind("ReplicaSet"): - return kubectl.ScalerFor(extensionsinternal.Kind("ReplicaSet"), internalClientset) - case extensionsinternal.Kind("Deployment"): - return kubectl.ScalerFor(extensionsinternal.Kind("Deployment"), internalClientset) - default: - return nil, fmt.Errorf("Unsupported kind for getting Scaler: %v", kind) - } + return kubectl.ScalerFor(kind, internalClientset) } func ScaleResource( @@ -2790,6 +2782,8 @@ func getRuntimeObjectForKind(c clientset.Interface, kind schema.GroupKind, ns, n return c.Extensions().Deployments(ns).Get(name, metav1.GetOptions{}) case extensionsinternal.Kind("DaemonSet"): return c.Extensions().DaemonSets(ns).Get(name, metav1.GetOptions{}) + case batchinternal.Kind("Job"): + return c.Batch().Jobs(ns).Get(name, metav1.GetOptions{}) default: return nil, fmt.Errorf("Unsupported kind when getting runtime object: %v", kind) } @@ -2803,6 +2797,10 @@ func deleteResource(c clientset.Interface, kind schema.GroupKind, ns, name strin return c.Extensions().ReplicaSets(ns).Delete(name, deleteOption) case extensionsinternal.Kind("Deployment"): return c.Extensions().Deployments(ns).Delete(name, deleteOption) + case extensionsinternal.Kind("DaemonSet"): + return c.Extensions().DaemonSets(ns).Delete(name, deleteOption) + case batchinternal.Kind("Job"): + return c.Batch().Jobs(ns).Delete(name, deleteOption) default: return fmt.Errorf("Unsupported kind when deleting: %v", kind) } @@ -2818,6 +2816,8 @@ func getSelectorFromRuntimeObject(obj runtime.Object) (labels.Selector, error) { return metav1.LabelSelectorAsSelector(typed.Spec.Selector) case *extensions.DaemonSet: return metav1.LabelSelectorAsSelector(typed.Spec.Selector) + case *batch.Job: + return metav1.LabelSelectorAsSelector(typed.Spec.Selector) default: return nil, fmt.Errorf("Unsupported kind when getting selector: %v", obj) } @@ -2840,24 +2840,20 @@ func getReplicasFromRuntimeObject(obj runtime.Object) (int32, error) { return *typed.Spec.Replicas, nil } return 0, nil + case *batch.Job: + // TODO: currently we use pause pods so that's OK. When we'll want to switch to Pods + // that actually finish we need a better way to do this. + if typed.Spec.Parallelism != nil { + return *typed.Spec.Parallelism, nil + } + return 0, nil default: return -1, fmt.Errorf("Unsupported kind when getting number of replicas: %v", obj) } } func getReaperForKind(internalClientset internalclientset.Interface, kind schema.GroupKind) (kubectl.Reaper, error) { - switch kind { - case api.Kind("ReplicationController"): - return kubectl.ReaperFor(api.Kind("ReplicationController"), internalClientset) - case extensionsinternal.Kind("ReplicaSet"): - return kubectl.ReaperFor(extensionsinternal.Kind("ReplicaSet"), internalClientset) - case extensionsinternal.Kind("Deployment"): - return kubectl.ReaperFor(extensionsinternal.Kind("Deployment"), internalClientset) - case extensionsinternal.Kind("DaemonSet"): - return kubectl.ReaperFor(extensionsinternal.Kind("DaemonSet"), internalClientset) - default: - return nil, fmt.Errorf("Unsupported kind: %v", kind) - } + return kubectl.ReaperFor(kind, internalClientset) } // DeleteResourceAndPods deletes a given resource and all pods it spawned diff --git a/test/e2e/load.go b/test/e2e/load.go index 1c3d6f7b674..e02bd8a86f3 100644 --- a/test/e2e/load.go +++ b/test/e2e/load.go @@ -29,6 +29,7 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/v1" + "k8s.io/kubernetes/pkg/apis/batch" "k8s.io/kubernetes/pkg/apis/extensions" "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5" @@ -394,6 +395,8 @@ func generateConfigsForGroup( config = &testutils.ReplicaSetConfig{RCConfig: *baseConfig} case extensions.Kind("Deployment"): config = &testutils.DeploymentConfig{RCConfig: *baseConfig} + case batch.Kind("Job"): + config = &testutils.JobConfig{RCConfig: *baseConfig} default: framework.Failf("Unsupported kind for config creation: %v", kind) } diff --git a/test/utils/BUILD b/test/utils/BUILD index f64910e932d..408382bf60f 100644 --- a/test/utils/BUILD +++ b/test/utils/BUILD @@ -22,6 +22,8 @@ go_library( "//pkg/api/errors:go_default_library", "//pkg/api/resource:go_default_library", "//pkg/api/v1:go_default_library", + "//pkg/apis/batch:go_default_library", + "//pkg/apis/batch/v1:go_default_library", "//pkg/apis/extensions:go_default_library", "//pkg/apis/extensions/v1beta1:go_default_library", "//pkg/apis/meta/v1:go_default_library", diff --git a/test/utils/runners.go b/test/utils/runners.go index 7a79895e357..be9589ccf12 100644 --- a/test/utils/runners.go +++ b/test/utils/runners.go @@ -27,6 +27,8 @@ import ( apierrs "k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/api/resource" "k8s.io/kubernetes/pkg/api/v1" + batchinternal "k8s.io/kubernetes/pkg/apis/batch" + batch "k8s.io/kubernetes/pkg/apis/batch/v1" extensionsinternal "k8s.io/kubernetes/pkg/apis/extensions" extensions "k8s.io/kubernetes/pkg/apis/extensions/v1beta1" metav1 "k8s.io/kubernetes/pkg/apis/meta/v1" @@ -132,6 +134,10 @@ type ReplicaSetConfig struct { RCConfig } +type JobConfig struct { + RCConfig +} + // podInfo contains pod information useful for debugging e2e tests. type podInfo struct { oldHostname string @@ -327,6 +333,66 @@ func (config *ReplicaSetConfig) create() error { return nil } +// RunJob baunches (and verifies correctness) of a Job +// and will wait for all pods it spawns to become "Running". +// It's the caller's responsibility to clean up externally (i.e. use the +// namespace lifecycle for handling Cleanup). +func RunJob(config JobConfig) error { + err := config.create() + if err != nil { + return err + } + return config.start() +} + +func (config *JobConfig) Run() error { + return RunJob(*config) +} + +func (config *JobConfig) GetKind() schema.GroupKind { + return batchinternal.Kind("Job") +} + +func (config *JobConfig) create() error { + job := &batch.Job{ + ObjectMeta: v1.ObjectMeta{ + Name: config.Name, + }, + Spec: batch.JobSpec{ + Parallelism: func(i int) *int32 { x := int32(i); return &x }(config.Replicas), + Completions: func(i int) *int32 { x := int32(i); return &x }(config.Replicas), + Template: v1.PodTemplateSpec{ + ObjectMeta: v1.ObjectMeta{ + Labels: map[string]string{"name": config.Name}, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: config.Name, + Image: config.Image, + Command: config.Command, + }, + }, + RestartPolicy: v1.RestartPolicyOnFailure, + }, + }, + }, + } + + if len(config.SecretNames) > 0 { + attachSecrets(&job.Spec.Template, config.SecretNames) + } + + config.applyTo(&job.Spec.Template) + + _, err := config.Client.Batch().Jobs(config.Namespace).Create(job) + if err != nil { + return fmt.Errorf("Error creating job: %v", err) + } + config.RCConfigLog("Created job with name: %v, namespace: %v, parallelism/completions: %v", job.Name, config.Namespace, job.Spec.Parallelism) + return nil +} + // RunRC Launches (and verifies correctness) of a Replication Controller // and will wait for all pods it spawns to become "Running". // It's the caller's responsibility to clean up externally (i.e. use the