diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index 38cb2979fe2..76fa7013bfb 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -36,7 +36,6 @@ import ( "k8s.io/kubernetes/cmd/kube-controller-manager/app/options" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/unversioned" - "k8s.io/kubernetes/pkg/apis/batch" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" "k8s.io/kubernetes/pkg/client/leaderelection" "k8s.io/kubernetes/pkg/client/record" @@ -129,7 +128,6 @@ func Run(s *options.CMServer) error { } kubeconfig.ContentConfig.ContentType = s.ContentType - kubeconfig.ContentConfig.GroupVersion = &unversioned.GroupVersion{Group: batch.GroupName, Version: "v2alpha1"} // Override kubeconfig qps/burst settings from flags kubeconfig.QPS = s.KubeAPIQPS kubeconfig.Burst = int(s.KubeAPIBurst) @@ -392,7 +390,7 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig glog.Infof("Starting %s apis", groupVersion) if containsResource(resources, "scheduledjobs") { glog.Infof("Starting scheduledjob controller") - go scheduledjob.NewScheduledJobController(kubeClient). + go scheduledjob.NewScheduledJobController(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "scheduledjob-controller"))). Run(wait.NeverStop) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) diff --git a/hack/e2e.go b/hack/e2e.go index e4e98e195e3..2e00c14fa9a 100644 --- a/hack/e2e.go +++ b/hack/e2e.go @@ -265,8 +265,6 @@ func TearDown() bool { // Up brings an e2e cluster up, recreating it if one is already running. func Up() bool { - // force having batch/v2alpha1 always on for e2e tests - os.Setenv("KUBE_RUNTIME_CONFIG", "batch/v2alpha1=true") return finishRunning("up", exec.Command("./hack/e2e-internal/e2e-up.sh")) } diff --git a/pkg/controller/scheduledjob/controller.go b/pkg/controller/scheduledjob/controller.go index 0ba32b4bc46..956e9eb44fd 100644 --- a/pkg/controller/scheduledjob/controller.go +++ b/pkg/controller/scheduledjob/controller.go @@ -37,8 +37,9 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/apis/batch" + clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" + unversionedcore "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/unversioned" "k8s.io/kubernetes/pkg/client/record" - client "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/controller/job" "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/util/metrics" @@ -49,20 +50,20 @@ import ( // Utilities for dealing with Jobs and ScheduledJobs and time. type ScheduledJobController struct { - kubeClient *client.Client + kubeClient clientset.Interface jobControl jobControlInterface sjControl sjControlInterface recorder record.EventRecorder } -func NewScheduledJobController(kubeClient *client.Client) *ScheduledJobController { +func NewScheduledJobController(kubeClient clientset.Interface) *ScheduledJobController { eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartLogging(glog.Infof) // TODO: remove the wrapper when every clients have moved to use the clientset. - eventBroadcaster.StartRecordingToSink(kubeClient.Events("")) + eventBroadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{Interface: kubeClient.Core().Events("")}) - if kubeClient != nil && kubeClient.GetRateLimiter() != nil { - metrics.RegisterMetricAndTrackRateLimiterUsage("scheduledjob_controller", kubeClient.GetRateLimiter()) + if kubeClient != nil && kubeClient.Batch().GetRESTClient().GetRateLimiter() != nil { + metrics.RegisterMetricAndTrackRateLimiterUsage("scheduledjob_controller", kubeClient.Batch().GetRESTClient().GetRateLimiter()) } jm := &ScheduledJobController{ @@ -75,7 +76,7 @@ func NewScheduledJobController(kubeClient *client.Client) *ScheduledJobControlle return jm } -func NewScheduledJobControllerFromClient(kubeClient *client.Client) *ScheduledJobController { +func NewScheduledJobControllerFromClient(kubeClient clientset.Interface) *ScheduledJobController { jm := NewScheduledJobController(kubeClient) return jm } diff --git a/pkg/controller/scheduledjob/injection.go b/pkg/controller/scheduledjob/injection.go index 9181dd001c9..fed5c06aa51 100644 --- a/pkg/controller/scheduledjob/injection.go +++ b/pkg/controller/scheduledjob/injection.go @@ -20,8 +20,8 @@ import ( "sync" "k8s.io/kubernetes/pkg/apis/batch" + clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" "k8s.io/kubernetes/pkg/client/record" - client "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/labels" ) @@ -33,7 +33,7 @@ type sjControlInterface interface { // realSJControl is the default implementation of sjControlInterface. type realSJControl struct { - KubeClient *client.Client + KubeClient clientset.Interface } var _ sjControlInterface = &realSJControl{} @@ -69,7 +69,7 @@ type jobControlInterface interface { // realJobControl is the default implementation of jobControlInterface. type realJobControl struct { - KubeClient *client.Client + KubeClient clientset.Interface Recorder record.EventRecorder } diff --git a/test/e2e/framework/framework.go b/test/e2e/framework/framework.go index 2020635e234..1d1cef1de64 100644 --- a/test/e2e/framework/framework.go +++ b/test/e2e/framework/framework.go @@ -32,7 +32,6 @@ import ( "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_4" "k8s.io/kubernetes/pkg/api" apierrs "k8s.io/kubernetes/pkg/api/errors" - "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_2" "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_3" "k8s.io/kubernetes/pkg/client/restclient" @@ -96,9 +95,8 @@ type TestDataSummary interface { } type FrameworkOptions struct { - ClientQPS float32 - ClientBurst int - GroupVersion *unversioned.GroupVersion + ClientQPS float32 + ClientBurst int } // NewFramework makes a new framework and sets up a BeforeEach/AfterEach for @@ -174,9 +172,6 @@ func (f *Framework) BeforeEach() { Expect(err).NotTo(HaveOccurred()) config.QPS = f.options.ClientQPS config.Burst = f.options.ClientBurst - if f.options.GroupVersion != nil { - config.GroupVersion = f.options.GroupVersion - } if TestContext.KubeAPIContentType != "" { config.ContentType = TestContext.KubeAPIContentType } diff --git a/test/e2e/scheduledjob.go b/test/e2e/scheduledjob.go deleted file mode 100644 index 0aa7799e43f..00000000000 --- a/test/e2e/scheduledjob.go +++ /dev/null @@ -1,245 +0,0 @@ -/* -Copyright 2016 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package e2e - -import ( - "fmt" - "time" - - . "github.com/onsi/ginkgo" - . "github.com/onsi/gomega" - - "k8s.io/kubernetes/pkg/api" - "k8s.io/kubernetes/pkg/api/unversioned" - "k8s.io/kubernetes/pkg/apis/batch" - client "k8s.io/kubernetes/pkg/client/unversioned" - "k8s.io/kubernetes/pkg/util/wait" - "k8s.io/kubernetes/test/e2e/framework" -) - -const ( - // How long to wait for a scheduledjob - scheduledJobTimeout = 5 * time.Minute -) - -var _ = framework.KubeDescribe("ScheduledJob", func() { - options := framework.FrameworkOptions{ - ClientQPS: 20, - ClientBurst: 50, - GroupVersion: &unversioned.GroupVersion{Group: batch.GroupName, Version: "v2alpha1"}, - } - f := framework.NewFramework("scheduledjob", options, nil) - - // multiple jobs running at once - It("should schedule multiple jobs concurrently", func() { - By("Creating a scheduledjob") - scheduledJob := newTestScheduledJob("concurrent", "*/1 * * * ?", batch.AllowConcurrent) - scheduledJob, err := createScheduledJob(f.Client, f.Namespace.Name, scheduledJob) - Expect(err).NotTo(HaveOccurred()) - - By("Ensuring more than one job is running at a time") - err = waitForActiveJobs(f.Client, f.Namespace.Name, scheduledJob.Name, 2) - Expect(err).NotTo(HaveOccurred()) - - By("Ensuring exactly two jobs exists by listing jobs explicitly") - jobs, err := f.Client.Batch().Jobs(f.Namespace.Name).List(api.ListOptions{}) - Expect(err).NotTo(HaveOccurred()) - Expect(len(jobs.Items) >= 2).To(BeTrue()) - - By("Removing scheduledjob") - err = deleteScheduledJob(f.Client, f.Namespace.Name, scheduledJob.Name) - Expect(err).NotTo(HaveOccurred()) - }) - - // suspended should not schedule jobs - It("should not schedule jobs when suspended", func() { - By("Creating a suspended scheduledjob") - scheduledJob := newTestScheduledJob("suspended", "*/1 * * * ?", batch.AllowConcurrent) - scheduledJob.Spec.Suspend = newBool(true) - scheduledJob, err := createScheduledJob(f.Client, f.Namespace.Name, scheduledJob) - Expect(err).NotTo(HaveOccurred()) - - By("Ensuring no jobs are scheduled") - err = waitForNoJobs(f.Client, f.Namespace.Name, scheduledJob.Name) - Expect(err).To(HaveOccurred()) - - By("Ensuring no job exists by listing jobs explicitly") - jobs, err := f.Client.Batch().Jobs(f.Namespace.Name).List(api.ListOptions{}) - Expect(err).NotTo(HaveOccurred()) - Expect(jobs.Items).To(HaveLen(0)) - - By("Removing scheduledjob") - err = deleteScheduledJob(f.Client, f.Namespace.Name, scheduledJob.Name) - Expect(err).NotTo(HaveOccurred()) - }) - - // only single active job is allowed for ForbidConcurrent - It("should not schedule new jobs when ForbidConcurrent", func() { - By("Creating a ForbidConcurrent scheduledjob") - scheduledJob := newTestScheduledJob("forbid", "*/1 * * * ?", batch.ForbidConcurrent) - scheduledJob, err := createScheduledJob(f.Client, f.Namespace.Name, scheduledJob) - Expect(err).NotTo(HaveOccurred()) - - By("Ensuring a job is scheduled") - err = waitForActiveJobs(f.Client, f.Namespace.Name, scheduledJob.Name, 1) - Expect(err).NotTo(HaveOccurred()) - - By("Ensuring exactly one is scheduled") - scheduledJob, err = getScheduledJob(f.Client, f.Namespace.Name, scheduledJob.Name) - Expect(err).NotTo(HaveOccurred()) - Expect(scheduledJob.Status.Active).Should(HaveLen(1)) - - By("Ensuring exaclty one job exists by listing jobs explicitly") - jobs, err := f.Client.Batch().Jobs(f.Namespace.Name).List(api.ListOptions{}) - Expect(err).NotTo(HaveOccurred()) - Expect(jobs.Items).To(HaveLen(1)) - - By("Ensuring no more jobs are scheduled") - err = waitForActiveJobs(f.Client, f.Namespace.Name, scheduledJob.Name, 2) - Expect(err).To(HaveOccurred()) - - By("Removing scheduledjob") - err = deleteScheduledJob(f.Client, f.Namespace.Name, scheduledJob.Name) - Expect(err).NotTo(HaveOccurred()) - }) - - // only single active job is allowed for ReplaceConcurrent - It("should replace jobs when ReplaceConcurrent", func() { - By("Creating a ReplaceConcurrent scheduledjob") - scheduledJob := newTestScheduledJob("replace", "*/1 * * * ?", batch.ReplaceConcurrent) - scheduledJob, err := createScheduledJob(f.Client, f.Namespace.Name, scheduledJob) - Expect(err).NotTo(HaveOccurred()) - - By("Ensuring a job is scheduled") - err = waitForActiveJobs(f.Client, f.Namespace.Name, scheduledJob.Name, 1) - Expect(err).NotTo(HaveOccurred()) - - By("Ensuring exactly one is scheduled") - scheduledJob, err = getScheduledJob(f.Client, f.Namespace.Name, scheduledJob.Name) - Expect(err).NotTo(HaveOccurred()) - Expect(scheduledJob.Status.Active).Should(HaveLen(1)) - - By("Ensuring exaclty one job exists by listing jobs explicitly") - jobs, err := f.Client.Batch().Jobs(f.Namespace.Name).List(api.ListOptions{}) - Expect(err).NotTo(HaveOccurred()) - Expect(jobs.Items).To(HaveLen(1)) - - By("Ensuring the job is replaced with a new one") - err = waitForJobReplaced(f.Client, f.Namespace.Name, jobs.Items[0].Name) - Expect(err).NotTo(HaveOccurred()) - - By("Removing scheduledjob") - err = deleteScheduledJob(f.Client, f.Namespace.Name, scheduledJob.Name) - Expect(err).NotTo(HaveOccurred()) - }) -}) - -// newTestScheduledJob returns a scheduledjob which does one of several testing behaviors. -func newTestScheduledJob(name, schedule string, concurrencyPolicy batch.ConcurrencyPolicy) *batch.ScheduledJob { - parallelism := int32(1) - completions := int32(1) - return &batch.ScheduledJob{ - ObjectMeta: api.ObjectMeta{ - Name: name, - }, - Spec: batch.ScheduledJobSpec{ - Schedule: schedule, - ConcurrencyPolicy: concurrencyPolicy, - JobTemplate: batch.JobTemplateSpec{ - Spec: batch.JobSpec{ - Parallelism: ¶llelism, - Completions: &completions, - Template: api.PodTemplateSpec{ - Spec: api.PodSpec{ - RestartPolicy: api.RestartPolicyOnFailure, - Volumes: []api.Volume{ - { - Name: "data", - VolumeSource: api.VolumeSource{ - EmptyDir: &api.EmptyDirVolumeSource{}, - }, - }, - }, - Containers: []api.Container{ - { - Name: "c", - Image: "gcr.io/google_containers/busybox:1.24", - Command: []string{"sleep", "300"}, - VolumeMounts: []api.VolumeMount{ - { - MountPath: "/data", - Name: "data", - }, - }, - }, - }, - }, - }, - }, - }, - }, - } -} - -func createScheduledJob(c *client.Client, ns string, scheduledJob *batch.ScheduledJob) (*batch.ScheduledJob, error) { - return c.Batch().ScheduledJobs(ns).Create(scheduledJob) -} - -func getScheduledJob(c *client.Client, ns, name string) (*batch.ScheduledJob, error) { - return c.Batch().ScheduledJobs(ns).Get(name) -} - -func deleteScheduledJob(c *client.Client, ns, name string) error { - return c.Batch().ScheduledJobs(ns).Delete(name, nil) -} - -// Wait for at least given amount of active jobs. -func waitForActiveJobs(c *client.Client, ns, scheduledJobName string, active int) error { - return wait.Poll(framework.Poll, scheduledJobTimeout, func() (bool, error) { - curr, err := c.Batch().ScheduledJobs(ns).Get(scheduledJobName) - if err != nil { - return false, err - } - return len(curr.Status.Active) >= active, nil - }) -} - -// Wait for no jobs to appear. -func waitForNoJobs(c *client.Client, ns, jobName string) error { - return wait.Poll(framework.Poll, scheduledJobTimeout, func() (bool, error) { - curr, err := c.Batch().ScheduledJobs(ns).Get(jobName) - if err != nil { - return false, err - } - - return len(curr.Status.Active) != 0, nil - }) -} - -// Wait for a job to be replaced with a new one. -func waitForJobReplaced(c *client.Client, ns, previousJobName string) error { - return wait.Poll(framework.Poll, scheduledJobTimeout, func() (bool, error) { - jobs, err := c.Batch().Jobs(ns).List(api.ListOptions{}) - if err != nil { - return false, err - } - if len(jobs.Items) != 1 { - return false, fmt.Errorf("More than one job is running") - } - return jobs.Items[0].Name != previousJobName, nil - }) -}