diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index 76fa7013bfb..38cb2979fe2 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -36,6 +36,7 @@ import ( "k8s.io/kubernetes/cmd/kube-controller-manager/app/options" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/unversioned" + "k8s.io/kubernetes/pkg/apis/batch" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" "k8s.io/kubernetes/pkg/client/leaderelection" "k8s.io/kubernetes/pkg/client/record" @@ -128,6 +129,7 @@ func Run(s *options.CMServer) error { } kubeconfig.ContentConfig.ContentType = s.ContentType + kubeconfig.ContentConfig.GroupVersion = &unversioned.GroupVersion{Group: batch.GroupName, Version: "v2alpha1"} // Override kubeconfig qps/burst settings from flags kubeconfig.QPS = s.KubeAPIQPS kubeconfig.Burst = int(s.KubeAPIBurst) @@ -390,7 +392,7 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig glog.Infof("Starting %s apis", groupVersion) if containsResource(resources, "scheduledjobs") { glog.Infof("Starting scheduledjob controller") - go scheduledjob.NewScheduledJobController(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "scheduledjob-controller"))). + go scheduledjob.NewScheduledJobController(kubeClient). Run(wait.NeverStop) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) diff --git a/hack/e2e.go b/hack/e2e.go index 2e00c14fa9a..e4e98e195e3 100644 --- a/hack/e2e.go +++ b/hack/e2e.go @@ -265,6 +265,8 @@ func TearDown() bool { // Up brings an e2e cluster up, recreating it if one is already running. func Up() bool { + // force having batch/v2alpha1 always on for e2e tests + os.Setenv("KUBE_RUNTIME_CONFIG", "batch/v2alpha1=true") return finishRunning("up", exec.Command("./hack/e2e-internal/e2e-up.sh")) } diff --git a/pkg/controller/scheduledjob/controller.go b/pkg/controller/scheduledjob/controller.go index 956e9eb44fd..0ba32b4bc46 100644 --- a/pkg/controller/scheduledjob/controller.go +++ b/pkg/controller/scheduledjob/controller.go @@ -37,9 +37,8 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/apis/batch" - clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" - unversionedcore "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/unversioned" "k8s.io/kubernetes/pkg/client/record" + client "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/controller/job" "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/util/metrics" @@ -50,20 +49,20 @@ import ( // Utilities for dealing with Jobs and ScheduledJobs and time. type ScheduledJobController struct { - kubeClient clientset.Interface + kubeClient *client.Client jobControl jobControlInterface sjControl sjControlInterface recorder record.EventRecorder } -func NewScheduledJobController(kubeClient clientset.Interface) *ScheduledJobController { +func NewScheduledJobController(kubeClient *client.Client) *ScheduledJobController { eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartLogging(glog.Infof) // TODO: remove the wrapper when every clients have moved to use the clientset. - eventBroadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{Interface: kubeClient.Core().Events("")}) + eventBroadcaster.StartRecordingToSink(kubeClient.Events("")) - if kubeClient != nil && kubeClient.Batch().GetRESTClient().GetRateLimiter() != nil { - metrics.RegisterMetricAndTrackRateLimiterUsage("scheduledjob_controller", kubeClient.Batch().GetRESTClient().GetRateLimiter()) + if kubeClient != nil && kubeClient.GetRateLimiter() != nil { + metrics.RegisterMetricAndTrackRateLimiterUsage("scheduledjob_controller", kubeClient.GetRateLimiter()) } jm := &ScheduledJobController{ @@ -76,7 +75,7 @@ func NewScheduledJobController(kubeClient clientset.Interface) *ScheduledJobCont return jm } -func NewScheduledJobControllerFromClient(kubeClient clientset.Interface) *ScheduledJobController { +func NewScheduledJobControllerFromClient(kubeClient *client.Client) *ScheduledJobController { jm := NewScheduledJobController(kubeClient) return jm } diff --git a/pkg/controller/scheduledjob/injection.go b/pkg/controller/scheduledjob/injection.go index fed5c06aa51..9181dd001c9 100644 --- a/pkg/controller/scheduledjob/injection.go +++ b/pkg/controller/scheduledjob/injection.go @@ -20,8 +20,8 @@ import ( "sync" "k8s.io/kubernetes/pkg/apis/batch" - clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" "k8s.io/kubernetes/pkg/client/record" + client "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/labels" ) @@ -33,7 +33,7 @@ type sjControlInterface interface { // realSJControl is the default implementation of sjControlInterface. type realSJControl struct { - KubeClient clientset.Interface + KubeClient *client.Client } var _ sjControlInterface = &realSJControl{} @@ -69,7 +69,7 @@ type jobControlInterface interface { // realJobControl is the default implementation of jobControlInterface. type realJobControl struct { - KubeClient clientset.Interface + KubeClient *client.Client Recorder record.EventRecorder } diff --git a/test/e2e/framework/framework.go b/test/e2e/framework/framework.go index 1d1cef1de64..2020635e234 100644 --- a/test/e2e/framework/framework.go +++ b/test/e2e/framework/framework.go @@ -32,6 +32,7 @@ import ( "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_4" "k8s.io/kubernetes/pkg/api" apierrs "k8s.io/kubernetes/pkg/api/errors" + "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_2" "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_3" "k8s.io/kubernetes/pkg/client/restclient" @@ -95,8 +96,9 @@ type TestDataSummary interface { } type FrameworkOptions struct { - ClientQPS float32 - ClientBurst int + ClientQPS float32 + ClientBurst int + GroupVersion *unversioned.GroupVersion } // NewFramework makes a new framework and sets up a BeforeEach/AfterEach for @@ -172,6 +174,9 @@ func (f *Framework) BeforeEach() { Expect(err).NotTo(HaveOccurred()) config.QPS = f.options.ClientQPS config.Burst = f.options.ClientBurst + if f.options.GroupVersion != nil { + config.GroupVersion = f.options.GroupVersion + } if TestContext.KubeAPIContentType != "" { config.ContentType = TestContext.KubeAPIContentType } diff --git a/test/e2e/scheduledjob.go b/test/e2e/scheduledjob.go new file mode 100644 index 00000000000..0aa7799e43f --- /dev/null +++ b/test/e2e/scheduledjob.go @@ -0,0 +1,245 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package e2e + +import ( + "fmt" + "time" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/unversioned" + "k8s.io/kubernetes/pkg/apis/batch" + client "k8s.io/kubernetes/pkg/client/unversioned" + "k8s.io/kubernetes/pkg/util/wait" + "k8s.io/kubernetes/test/e2e/framework" +) + +const ( + // How long to wait for a scheduledjob + scheduledJobTimeout = 5 * time.Minute +) + +var _ = framework.KubeDescribe("ScheduledJob", func() { + options := framework.FrameworkOptions{ + ClientQPS: 20, + ClientBurst: 50, + GroupVersion: &unversioned.GroupVersion{Group: batch.GroupName, Version: "v2alpha1"}, + } + f := framework.NewFramework("scheduledjob", options, nil) + + // multiple jobs running at once + It("should schedule multiple jobs concurrently", func() { + By("Creating a scheduledjob") + scheduledJob := newTestScheduledJob("concurrent", "*/1 * * * ?", batch.AllowConcurrent) + scheduledJob, err := createScheduledJob(f.Client, f.Namespace.Name, scheduledJob) + Expect(err).NotTo(HaveOccurred()) + + By("Ensuring more than one job is running at a time") + err = waitForActiveJobs(f.Client, f.Namespace.Name, scheduledJob.Name, 2) + Expect(err).NotTo(HaveOccurred()) + + By("Ensuring exactly two jobs exists by listing jobs explicitly") + jobs, err := f.Client.Batch().Jobs(f.Namespace.Name).List(api.ListOptions{}) + Expect(err).NotTo(HaveOccurred()) + Expect(len(jobs.Items) >= 2).To(BeTrue()) + + By("Removing scheduledjob") + err = deleteScheduledJob(f.Client, f.Namespace.Name, scheduledJob.Name) + Expect(err).NotTo(HaveOccurred()) + }) + + // suspended should not schedule jobs + It("should not schedule jobs when suspended", func() { + By("Creating a suspended scheduledjob") + scheduledJob := newTestScheduledJob("suspended", "*/1 * * * ?", batch.AllowConcurrent) + scheduledJob.Spec.Suspend = newBool(true) + scheduledJob, err := createScheduledJob(f.Client, f.Namespace.Name, scheduledJob) + Expect(err).NotTo(HaveOccurred()) + + By("Ensuring no jobs are scheduled") + err = waitForNoJobs(f.Client, f.Namespace.Name, scheduledJob.Name) + Expect(err).To(HaveOccurred()) + + By("Ensuring no job exists by listing jobs explicitly") + jobs, err := f.Client.Batch().Jobs(f.Namespace.Name).List(api.ListOptions{}) + Expect(err).NotTo(HaveOccurred()) + Expect(jobs.Items).To(HaveLen(0)) + + By("Removing scheduledjob") + err = deleteScheduledJob(f.Client, f.Namespace.Name, scheduledJob.Name) + Expect(err).NotTo(HaveOccurred()) + }) + + // only single active job is allowed for ForbidConcurrent + It("should not schedule new jobs when ForbidConcurrent", func() { + By("Creating a ForbidConcurrent scheduledjob") + scheduledJob := newTestScheduledJob("forbid", "*/1 * * * ?", batch.ForbidConcurrent) + scheduledJob, err := createScheduledJob(f.Client, f.Namespace.Name, scheduledJob) + Expect(err).NotTo(HaveOccurred()) + + By("Ensuring a job is scheduled") + err = waitForActiveJobs(f.Client, f.Namespace.Name, scheduledJob.Name, 1) + Expect(err).NotTo(HaveOccurred()) + + By("Ensuring exactly one is scheduled") + scheduledJob, err = getScheduledJob(f.Client, f.Namespace.Name, scheduledJob.Name) + Expect(err).NotTo(HaveOccurred()) + Expect(scheduledJob.Status.Active).Should(HaveLen(1)) + + By("Ensuring exaclty one job exists by listing jobs explicitly") + jobs, err := f.Client.Batch().Jobs(f.Namespace.Name).List(api.ListOptions{}) + Expect(err).NotTo(HaveOccurred()) + Expect(jobs.Items).To(HaveLen(1)) + + By("Ensuring no more jobs are scheduled") + err = waitForActiveJobs(f.Client, f.Namespace.Name, scheduledJob.Name, 2) + Expect(err).To(HaveOccurred()) + + By("Removing scheduledjob") + err = deleteScheduledJob(f.Client, f.Namespace.Name, scheduledJob.Name) + Expect(err).NotTo(HaveOccurred()) + }) + + // only single active job is allowed for ReplaceConcurrent + It("should replace jobs when ReplaceConcurrent", func() { + By("Creating a ReplaceConcurrent scheduledjob") + scheduledJob := newTestScheduledJob("replace", "*/1 * * * ?", batch.ReplaceConcurrent) + scheduledJob, err := createScheduledJob(f.Client, f.Namespace.Name, scheduledJob) + Expect(err).NotTo(HaveOccurred()) + + By("Ensuring a job is scheduled") + err = waitForActiveJobs(f.Client, f.Namespace.Name, scheduledJob.Name, 1) + Expect(err).NotTo(HaveOccurred()) + + By("Ensuring exactly one is scheduled") + scheduledJob, err = getScheduledJob(f.Client, f.Namespace.Name, scheduledJob.Name) + Expect(err).NotTo(HaveOccurred()) + Expect(scheduledJob.Status.Active).Should(HaveLen(1)) + + By("Ensuring exaclty one job exists by listing jobs explicitly") + jobs, err := f.Client.Batch().Jobs(f.Namespace.Name).List(api.ListOptions{}) + Expect(err).NotTo(HaveOccurred()) + Expect(jobs.Items).To(HaveLen(1)) + + By("Ensuring the job is replaced with a new one") + err = waitForJobReplaced(f.Client, f.Namespace.Name, jobs.Items[0].Name) + Expect(err).NotTo(HaveOccurred()) + + By("Removing scheduledjob") + err = deleteScheduledJob(f.Client, f.Namespace.Name, scheduledJob.Name) + Expect(err).NotTo(HaveOccurred()) + }) +}) + +// newTestScheduledJob returns a scheduledjob which does one of several testing behaviors. +func newTestScheduledJob(name, schedule string, concurrencyPolicy batch.ConcurrencyPolicy) *batch.ScheduledJob { + parallelism := int32(1) + completions := int32(1) + return &batch.ScheduledJob{ + ObjectMeta: api.ObjectMeta{ + Name: name, + }, + Spec: batch.ScheduledJobSpec{ + Schedule: schedule, + ConcurrencyPolicy: concurrencyPolicy, + JobTemplate: batch.JobTemplateSpec{ + Spec: batch.JobSpec{ + Parallelism: ¶llelism, + Completions: &completions, + Template: api.PodTemplateSpec{ + Spec: api.PodSpec{ + RestartPolicy: api.RestartPolicyOnFailure, + Volumes: []api.Volume{ + { + Name: "data", + VolumeSource: api.VolumeSource{ + EmptyDir: &api.EmptyDirVolumeSource{}, + }, + }, + }, + Containers: []api.Container{ + { + Name: "c", + Image: "gcr.io/google_containers/busybox:1.24", + Command: []string{"sleep", "300"}, + VolumeMounts: []api.VolumeMount{ + { + MountPath: "/data", + Name: "data", + }, + }, + }, + }, + }, + }, + }, + }, + }, + } +} + +func createScheduledJob(c *client.Client, ns string, scheduledJob *batch.ScheduledJob) (*batch.ScheduledJob, error) { + return c.Batch().ScheduledJobs(ns).Create(scheduledJob) +} + +func getScheduledJob(c *client.Client, ns, name string) (*batch.ScheduledJob, error) { + return c.Batch().ScheduledJobs(ns).Get(name) +} + +func deleteScheduledJob(c *client.Client, ns, name string) error { + return c.Batch().ScheduledJobs(ns).Delete(name, nil) +} + +// Wait for at least given amount of active jobs. +func waitForActiveJobs(c *client.Client, ns, scheduledJobName string, active int) error { + return wait.Poll(framework.Poll, scheduledJobTimeout, func() (bool, error) { + curr, err := c.Batch().ScheduledJobs(ns).Get(scheduledJobName) + if err != nil { + return false, err + } + return len(curr.Status.Active) >= active, nil + }) +} + +// Wait for no jobs to appear. +func waitForNoJobs(c *client.Client, ns, jobName string) error { + return wait.Poll(framework.Poll, scheduledJobTimeout, func() (bool, error) { + curr, err := c.Batch().ScheduledJobs(ns).Get(jobName) + if err != nil { + return false, err + } + + return len(curr.Status.Active) != 0, nil + }) +} + +// Wait for a job to be replaced with a new one. +func waitForJobReplaced(c *client.Client, ns, previousJobName string) error { + return wait.Poll(framework.Poll, scheduledJobTimeout, func() (bool, error) { + jobs, err := c.Batch().Jobs(ns).List(api.ListOptions{}) + if err != nil { + return false, err + } + if len(jobs.Items) != 1 { + return false, fmt.Errorf("More than one job is running") + } + return jobs.Items[0].Name != previousJobName, nil + }) +}