Merge pull request #38461 from gmarek/job

Automatic merge from submit-queue

Add an option to run Job in Density/Load config

cc @timothysc @jeremyeder 

@erictune @soltysh - I run this test and it seems to me that Job has noticeably worse performance than Deployment. I'll create an issue for this, but this PR is for easy repro.
This commit is contained in:
Kubernetes Submit Queue 2016-12-13 05:57:18 -08:00 committed by GitHub
commit 380ad617f3
6 changed files with 98 additions and 24 deletions

View File

@ -28,6 +28,7 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/resource"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/apis/batch"
"k8s.io/kubernetes/pkg/apis/extensions"
metav1 "k8s.io/kubernetes/pkg/apis/meta/v1"
"k8s.io/kubernetes/pkg/client/cache"
@ -257,8 +258,7 @@ func cleanupDensityTest(dtc DensityTestConfig) {
name := dtc.Configs[i].GetName()
namespace := dtc.Configs[i].GetNamespace()
kind := dtc.Configs[i].GetKind()
// TODO: Remove Deployment guard once GC is implemented for Deployments.
if framework.TestContext.GarbageCollectorEnabled && kind != extensions.Kind("Deployment") {
if framework.TestContext.GarbageCollectorEnabled && kindSupportsGarbageCollector(kind) {
By(fmt.Sprintf("Cleaning up only the %v, garbage collector will clean up the pods", kind))
err := framework.DeleteResourceAndWaitForGC(dtc.ClientSet, kind, namespace, name)
framework.ExpectNoError(err)
@ -480,6 +480,8 @@ var _ = framework.KubeDescribe("Density", func() {
configs[i] = &testutils.ReplicaSetConfig{RCConfig: *baseConfig}
case extensions.Kind("Deployment"):
configs[i] = &testutils.DeploymentConfig{RCConfig: *baseConfig}
case batch.Kind("Job"):
configs[i] = &testutils.JobConfig{RCConfig: *baseConfig}
default:
framework.Failf("Unsupported kind: %v", itArg.kind)
}
@ -804,3 +806,7 @@ func createRunningPodFromRC(wg *sync.WaitGroup, c clientset.Interface, name, ns,
framework.ExpectNoError(framework.WaitForControlledPodsRunning(c, ns, name, api.Kind("ReplicationController")))
framework.Logf("Found pod '%s' running", name)
}
func kindSupportsGarbageCollector(kind schema.GroupKind) bool {
return kind != extensions.Kind("Deployment") && kind != batch.Kind("Job")
}

View File

@ -35,6 +35,7 @@ go_library(
"//pkg/api/validation:go_default_library",
"//pkg/apimachinery/registered:go_default_library",
"//pkg/apis/apps/v1beta1:go_default_library",
"//pkg/apis/batch:go_default_library",
"//pkg/apis/batch/v1:go_default_library",
"//pkg/apis/componentconfig:go_default_library",
"//pkg/apis/extensions:go_default_library",

View File

@ -56,6 +56,7 @@ import (
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/apimachinery/registered"
apps "k8s.io/kubernetes/pkg/apis/apps/v1beta1"
batchinternal "k8s.io/kubernetes/pkg/apis/batch"
batch "k8s.io/kubernetes/pkg/apis/batch/v1"
extensionsinternal "k8s.io/kubernetes/pkg/apis/extensions"
extensions "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
@ -2638,16 +2639,7 @@ func RemoveTaintOffNode(c clientset.Interface, nodeName string, taint v1.Taint)
}
func getScalerForKind(internalClientset internalclientset.Interface, kind schema.GroupKind) (kubectl.Scaler, error) {
switch kind {
case api.Kind("ReplicationController"):
return kubectl.ScalerFor(api.Kind("ReplicationController"), internalClientset)
case extensionsinternal.Kind("ReplicaSet"):
return kubectl.ScalerFor(extensionsinternal.Kind("ReplicaSet"), internalClientset)
case extensionsinternal.Kind("Deployment"):
return kubectl.ScalerFor(extensionsinternal.Kind("Deployment"), internalClientset)
default:
return nil, fmt.Errorf("Unsupported kind for getting Scaler: %v", kind)
}
return kubectl.ScalerFor(kind, internalClientset)
}
func ScaleResource(
@ -2786,6 +2778,8 @@ func getRuntimeObjectForKind(c clientset.Interface, kind schema.GroupKind, ns, n
return c.Extensions().Deployments(ns).Get(name, metav1.GetOptions{})
case extensionsinternal.Kind("DaemonSet"):
return c.Extensions().DaemonSets(ns).Get(name, metav1.GetOptions{})
case batchinternal.Kind("Job"):
return c.Batch().Jobs(ns).Get(name, metav1.GetOptions{})
default:
return nil, fmt.Errorf("Unsupported kind when getting runtime object: %v", kind)
}
@ -2799,6 +2793,10 @@ func deleteResource(c clientset.Interface, kind schema.GroupKind, ns, name strin
return c.Extensions().ReplicaSets(ns).Delete(name, deleteOption)
case extensionsinternal.Kind("Deployment"):
return c.Extensions().Deployments(ns).Delete(name, deleteOption)
case extensionsinternal.Kind("DaemonSet"):
return c.Extensions().DaemonSets(ns).Delete(name, deleteOption)
case batchinternal.Kind("Job"):
return c.Batch().Jobs(ns).Delete(name, deleteOption)
default:
return fmt.Errorf("Unsupported kind when deleting: %v", kind)
}
@ -2814,6 +2812,8 @@ func getSelectorFromRuntimeObject(obj runtime.Object) (labels.Selector, error) {
return metav1.LabelSelectorAsSelector(typed.Spec.Selector)
case *extensions.DaemonSet:
return metav1.LabelSelectorAsSelector(typed.Spec.Selector)
case *batch.Job:
return metav1.LabelSelectorAsSelector(typed.Spec.Selector)
default:
return nil, fmt.Errorf("Unsupported kind when getting selector: %v", obj)
}
@ -2836,24 +2836,20 @@ func getReplicasFromRuntimeObject(obj runtime.Object) (int32, error) {
return *typed.Spec.Replicas, nil
}
return 0, nil
case *batch.Job:
// TODO: currently we use pause pods so that's OK. When we'll want to switch to Pods
// that actually finish we need a better way to do this.
if typed.Spec.Parallelism != nil {
return *typed.Spec.Parallelism, nil
}
return 0, nil
default:
return -1, fmt.Errorf("Unsupported kind when getting number of replicas: %v", obj)
}
}
func getReaperForKind(internalClientset internalclientset.Interface, kind schema.GroupKind) (kubectl.Reaper, error) {
switch kind {
case api.Kind("ReplicationController"):
return kubectl.ReaperFor(api.Kind("ReplicationController"), internalClientset)
case extensionsinternal.Kind("ReplicaSet"):
return kubectl.ReaperFor(extensionsinternal.Kind("ReplicaSet"), internalClientset)
case extensionsinternal.Kind("Deployment"):
return kubectl.ReaperFor(extensionsinternal.Kind("Deployment"), internalClientset)
case extensionsinternal.Kind("DaemonSet"):
return kubectl.ReaperFor(extensionsinternal.Kind("DaemonSet"), internalClientset)
default:
return nil, fmt.Errorf("Unsupported kind: %v", kind)
}
return kubectl.ReaperFor(kind, internalClientset)
}
// DeleteResourceAndPods deletes a given resource and all pods it spawned

View File

@ -29,6 +29,7 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/apis/batch"
"k8s.io/kubernetes/pkg/apis/extensions"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5"
@ -394,6 +395,8 @@ func generateConfigsForGroup(
config = &testutils.ReplicaSetConfig{RCConfig: *baseConfig}
case extensions.Kind("Deployment"):
config = &testutils.DeploymentConfig{RCConfig: *baseConfig}
case batch.Kind("Job"):
config = &testutils.JobConfig{RCConfig: *baseConfig}
default:
framework.Failf("Unsupported kind for config creation: %v", kind)
}

View File

@ -22,6 +22,8 @@ go_library(
"//pkg/api/errors:go_default_library",
"//pkg/api/resource:go_default_library",
"//pkg/api/v1:go_default_library",
"//pkg/apis/batch:go_default_library",
"//pkg/apis/batch/v1:go_default_library",
"//pkg/apis/extensions:go_default_library",
"//pkg/apis/extensions/v1beta1:go_default_library",
"//pkg/apis/meta/v1:go_default_library",

View File

@ -27,6 +27,8 @@ import (
apierrs "k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/resource"
"k8s.io/kubernetes/pkg/api/v1"
batchinternal "k8s.io/kubernetes/pkg/apis/batch"
batch "k8s.io/kubernetes/pkg/apis/batch/v1"
extensionsinternal "k8s.io/kubernetes/pkg/apis/extensions"
extensions "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
metav1 "k8s.io/kubernetes/pkg/apis/meta/v1"
@ -132,6 +134,10 @@ type ReplicaSetConfig struct {
RCConfig
}
type JobConfig struct {
RCConfig
}
// podInfo contains pod information useful for debugging e2e tests.
type podInfo struct {
oldHostname string
@ -327,6 +333,66 @@ func (config *ReplicaSetConfig) create() error {
return nil
}
// RunJob baunches (and verifies correctness) of a Job
// and will wait for all pods it spawns to become "Running".
// It's the caller's responsibility to clean up externally (i.e. use the
// namespace lifecycle for handling Cleanup).
func RunJob(config JobConfig) error {
err := config.create()
if err != nil {
return err
}
return config.start()
}
func (config *JobConfig) Run() error {
return RunJob(*config)
}
func (config *JobConfig) GetKind() schema.GroupKind {
return batchinternal.Kind("Job")
}
func (config *JobConfig) create() error {
job := &batch.Job{
ObjectMeta: v1.ObjectMeta{
Name: config.Name,
},
Spec: batch.JobSpec{
Parallelism: func(i int) *int32 { x := int32(i); return &x }(config.Replicas),
Completions: func(i int) *int32 { x := int32(i); return &x }(config.Replicas),
Template: v1.PodTemplateSpec{
ObjectMeta: v1.ObjectMeta{
Labels: map[string]string{"name": config.Name},
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: config.Name,
Image: config.Image,
Command: config.Command,
},
},
RestartPolicy: v1.RestartPolicyOnFailure,
},
},
},
}
if len(config.SecretNames) > 0 {
attachSecrets(&job.Spec.Template, config.SecretNames)
}
config.applyTo(&job.Spec.Template)
_, err := config.Client.Batch().Jobs(config.Namespace).Create(job)
if err != nil {
return fmt.Errorf("Error creating job: %v", err)
}
config.RCConfigLog("Created job with name: %v, namespace: %v, parallelism/completions: %v", job.Name, config.Namespace, job.Spec.Parallelism)
return nil
}
// RunRC Launches (and verifies correctness) of a Replication Controller
// and will wait for all pods it spawns to become "Running".
// It's the caller's responsibility to clean up externally (i.e. use the