mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-27 05:27:21 +00:00
Merge pull request #26027 from soltysh/scheduledjob_e2e
Automatic merge from submit-queue Scheduledjob e2e @erictune last element of the scheduledjob puzzle. I think we'll iterate on this once we have all the puzzles in place. This is one of those things that will be allowed to merge after code freeze. ```release-note * Introducing ScheduledJobs as described in [the proposal](https://github.com/kubernetes/kubernetes/blob/master/docs/proposals/scheduledjob.md) as part of `batch/v2alpha1` version (experimental feature). ``` []()
This commit is contained in:
commit
ac1f8bc665
@ -36,6 +36,7 @@ import (
|
|||||||
"k8s.io/kubernetes/cmd/kube-controller-manager/app/options"
|
"k8s.io/kubernetes/cmd/kube-controller-manager/app/options"
|
||||||
"k8s.io/kubernetes/pkg/api"
|
"k8s.io/kubernetes/pkg/api"
|
||||||
"k8s.io/kubernetes/pkg/api/unversioned"
|
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||||
|
"k8s.io/kubernetes/pkg/apis/batch"
|
||||||
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
|
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
|
||||||
"k8s.io/kubernetes/pkg/client/leaderelection"
|
"k8s.io/kubernetes/pkg/client/leaderelection"
|
||||||
"k8s.io/kubernetes/pkg/client/record"
|
"k8s.io/kubernetes/pkg/client/record"
|
||||||
@ -128,6 +129,7 @@ func Run(s *options.CMServer) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
kubeconfig.ContentConfig.ContentType = s.ContentType
|
kubeconfig.ContentConfig.ContentType = s.ContentType
|
||||||
|
kubeconfig.ContentConfig.GroupVersion = &unversioned.GroupVersion{Group: batch.GroupName, Version: "v2alpha1"}
|
||||||
// Override kubeconfig qps/burst settings from flags
|
// Override kubeconfig qps/burst settings from flags
|
||||||
kubeconfig.QPS = s.KubeAPIQPS
|
kubeconfig.QPS = s.KubeAPIQPS
|
||||||
kubeconfig.Burst = int(s.KubeAPIBurst)
|
kubeconfig.Burst = int(s.KubeAPIBurst)
|
||||||
@ -390,7 +392,7 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig
|
|||||||
glog.Infof("Starting %s apis", groupVersion)
|
glog.Infof("Starting %s apis", groupVersion)
|
||||||
if containsResource(resources, "scheduledjobs") {
|
if containsResource(resources, "scheduledjobs") {
|
||||||
glog.Infof("Starting scheduledjob controller")
|
glog.Infof("Starting scheduledjob controller")
|
||||||
go scheduledjob.NewScheduledJobController(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "scheduledjob-controller"))).
|
go scheduledjob.NewScheduledJobController(kubeClient).
|
||||||
Run(wait.NeverStop)
|
Run(wait.NeverStop)
|
||||||
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
|
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
|
||||||
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
|
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
|
||||||
|
@ -265,6 +265,8 @@ func TearDown() bool {
|
|||||||
|
|
||||||
// Up brings an e2e cluster up, recreating it if one is already running.
|
// Up brings an e2e cluster up, recreating it if one is already running.
|
||||||
func Up() bool {
|
func Up() bool {
|
||||||
|
// force having batch/v2alpha1 always on for e2e tests
|
||||||
|
os.Setenv("KUBE_RUNTIME_CONFIG", "batch/v2alpha1=true")
|
||||||
return finishRunning("up", exec.Command("./hack/e2e-internal/e2e-up.sh"))
|
return finishRunning("up", exec.Command("./hack/e2e-internal/e2e-up.sh"))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -37,9 +37,8 @@ import (
|
|||||||
"k8s.io/kubernetes/pkg/api"
|
"k8s.io/kubernetes/pkg/api"
|
||||||
"k8s.io/kubernetes/pkg/api/unversioned"
|
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||||
"k8s.io/kubernetes/pkg/apis/batch"
|
"k8s.io/kubernetes/pkg/apis/batch"
|
||||||
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
|
|
||||||
unversionedcore "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/unversioned"
|
|
||||||
"k8s.io/kubernetes/pkg/client/record"
|
"k8s.io/kubernetes/pkg/client/record"
|
||||||
|
client "k8s.io/kubernetes/pkg/client/unversioned"
|
||||||
"k8s.io/kubernetes/pkg/controller/job"
|
"k8s.io/kubernetes/pkg/controller/job"
|
||||||
"k8s.io/kubernetes/pkg/runtime"
|
"k8s.io/kubernetes/pkg/runtime"
|
||||||
"k8s.io/kubernetes/pkg/util/metrics"
|
"k8s.io/kubernetes/pkg/util/metrics"
|
||||||
@ -50,20 +49,20 @@ import (
|
|||||||
// Utilities for dealing with Jobs and ScheduledJobs and time.
|
// Utilities for dealing with Jobs and ScheduledJobs and time.
|
||||||
|
|
||||||
type ScheduledJobController struct {
|
type ScheduledJobController struct {
|
||||||
kubeClient clientset.Interface
|
kubeClient *client.Client
|
||||||
jobControl jobControlInterface
|
jobControl jobControlInterface
|
||||||
sjControl sjControlInterface
|
sjControl sjControlInterface
|
||||||
recorder record.EventRecorder
|
recorder record.EventRecorder
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewScheduledJobController(kubeClient clientset.Interface) *ScheduledJobController {
|
func NewScheduledJobController(kubeClient *client.Client) *ScheduledJobController {
|
||||||
eventBroadcaster := record.NewBroadcaster()
|
eventBroadcaster := record.NewBroadcaster()
|
||||||
eventBroadcaster.StartLogging(glog.Infof)
|
eventBroadcaster.StartLogging(glog.Infof)
|
||||||
// TODO: remove the wrapper when every clients have moved to use the clientset.
|
// TODO: remove the wrapper when every clients have moved to use the clientset.
|
||||||
eventBroadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{Interface: kubeClient.Core().Events("")})
|
eventBroadcaster.StartRecordingToSink(kubeClient.Events(""))
|
||||||
|
|
||||||
if kubeClient != nil && kubeClient.Batch().GetRESTClient().GetRateLimiter() != nil {
|
if kubeClient != nil && kubeClient.GetRateLimiter() != nil {
|
||||||
metrics.RegisterMetricAndTrackRateLimiterUsage("scheduledjob_controller", kubeClient.Batch().GetRESTClient().GetRateLimiter())
|
metrics.RegisterMetricAndTrackRateLimiterUsage("scheduledjob_controller", kubeClient.GetRateLimiter())
|
||||||
}
|
}
|
||||||
|
|
||||||
jm := &ScheduledJobController{
|
jm := &ScheduledJobController{
|
||||||
@ -76,7 +75,7 @@ func NewScheduledJobController(kubeClient clientset.Interface) *ScheduledJobCont
|
|||||||
return jm
|
return jm
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewScheduledJobControllerFromClient(kubeClient clientset.Interface) *ScheduledJobController {
|
func NewScheduledJobControllerFromClient(kubeClient *client.Client) *ScheduledJobController {
|
||||||
jm := NewScheduledJobController(kubeClient)
|
jm := NewScheduledJobController(kubeClient)
|
||||||
return jm
|
return jm
|
||||||
}
|
}
|
||||||
|
@ -20,8 +20,8 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"k8s.io/kubernetes/pkg/apis/batch"
|
"k8s.io/kubernetes/pkg/apis/batch"
|
||||||
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
|
|
||||||
"k8s.io/kubernetes/pkg/client/record"
|
"k8s.io/kubernetes/pkg/client/record"
|
||||||
|
client "k8s.io/kubernetes/pkg/client/unversioned"
|
||||||
"k8s.io/kubernetes/pkg/labels"
|
"k8s.io/kubernetes/pkg/labels"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -33,7 +33,7 @@ type sjControlInterface interface {
|
|||||||
|
|
||||||
// realSJControl is the default implementation of sjControlInterface.
|
// realSJControl is the default implementation of sjControlInterface.
|
||||||
type realSJControl struct {
|
type realSJControl struct {
|
||||||
KubeClient clientset.Interface
|
KubeClient *client.Client
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ sjControlInterface = &realSJControl{}
|
var _ sjControlInterface = &realSJControl{}
|
||||||
@ -69,7 +69,7 @@ type jobControlInterface interface {
|
|||||||
|
|
||||||
// realJobControl is the default implementation of jobControlInterface.
|
// realJobControl is the default implementation of jobControlInterface.
|
||||||
type realJobControl struct {
|
type realJobControl struct {
|
||||||
KubeClient clientset.Interface
|
KubeClient *client.Client
|
||||||
Recorder record.EventRecorder
|
Recorder record.EventRecorder
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -32,6 +32,7 @@ import (
|
|||||||
"k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_4"
|
"k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_4"
|
||||||
"k8s.io/kubernetes/pkg/api"
|
"k8s.io/kubernetes/pkg/api"
|
||||||
apierrs "k8s.io/kubernetes/pkg/api/errors"
|
apierrs "k8s.io/kubernetes/pkg/api/errors"
|
||||||
|
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||||
"k8s.io/kubernetes/pkg/client/clientset_generated/release_1_2"
|
"k8s.io/kubernetes/pkg/client/clientset_generated/release_1_2"
|
||||||
"k8s.io/kubernetes/pkg/client/clientset_generated/release_1_3"
|
"k8s.io/kubernetes/pkg/client/clientset_generated/release_1_3"
|
||||||
"k8s.io/kubernetes/pkg/client/restclient"
|
"k8s.io/kubernetes/pkg/client/restclient"
|
||||||
@ -95,8 +96,9 @@ type TestDataSummary interface {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type FrameworkOptions struct {
|
type FrameworkOptions struct {
|
||||||
ClientQPS float32
|
ClientQPS float32
|
||||||
ClientBurst int
|
ClientBurst int
|
||||||
|
GroupVersion *unversioned.GroupVersion
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewFramework makes a new framework and sets up a BeforeEach/AfterEach for
|
// NewFramework makes a new framework and sets up a BeforeEach/AfterEach for
|
||||||
@ -172,6 +174,9 @@ func (f *Framework) BeforeEach() {
|
|||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
config.QPS = f.options.ClientQPS
|
config.QPS = f.options.ClientQPS
|
||||||
config.Burst = f.options.ClientBurst
|
config.Burst = f.options.ClientBurst
|
||||||
|
if f.options.GroupVersion != nil {
|
||||||
|
config.GroupVersion = f.options.GroupVersion
|
||||||
|
}
|
||||||
if TestContext.KubeAPIContentType != "" {
|
if TestContext.KubeAPIContentType != "" {
|
||||||
config.ContentType = TestContext.KubeAPIContentType
|
config.ContentType = TestContext.KubeAPIContentType
|
||||||
}
|
}
|
||||||
|
245
test/e2e/scheduledjob.go
Normal file
245
test/e2e/scheduledjob.go
Normal file
@ -0,0 +1,245 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2016 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package e2e
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
. "github.com/onsi/ginkgo"
|
||||||
|
. "github.com/onsi/gomega"
|
||||||
|
|
||||||
|
"k8s.io/kubernetes/pkg/api"
|
||||||
|
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||||
|
"k8s.io/kubernetes/pkg/apis/batch"
|
||||||
|
client "k8s.io/kubernetes/pkg/client/unversioned"
|
||||||
|
"k8s.io/kubernetes/pkg/util/wait"
|
||||||
|
"k8s.io/kubernetes/test/e2e/framework"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
// How long to wait for a scheduledjob
|
||||||
|
scheduledJobTimeout = 5 * time.Minute
|
||||||
|
)
|
||||||
|
|
||||||
|
var _ = framework.KubeDescribe("ScheduledJob", func() {
|
||||||
|
options := framework.FrameworkOptions{
|
||||||
|
ClientQPS: 20,
|
||||||
|
ClientBurst: 50,
|
||||||
|
GroupVersion: &unversioned.GroupVersion{Group: batch.GroupName, Version: "v2alpha1"},
|
||||||
|
}
|
||||||
|
f := framework.NewFramework("scheduledjob", options, nil)
|
||||||
|
|
||||||
|
// multiple jobs running at once
|
||||||
|
It("should schedule multiple jobs concurrently", func() {
|
||||||
|
By("Creating a scheduledjob")
|
||||||
|
scheduledJob := newTestScheduledJob("concurrent", "*/1 * * * ?", batch.AllowConcurrent)
|
||||||
|
scheduledJob, err := createScheduledJob(f.Client, f.Namespace.Name, scheduledJob)
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
|
By("Ensuring more than one job is running at a time")
|
||||||
|
err = waitForActiveJobs(f.Client, f.Namespace.Name, scheduledJob.Name, 2)
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
|
By("Ensuring exactly two jobs exists by listing jobs explicitly")
|
||||||
|
jobs, err := f.Client.Batch().Jobs(f.Namespace.Name).List(api.ListOptions{})
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
Expect(len(jobs.Items) >= 2).To(BeTrue())
|
||||||
|
|
||||||
|
By("Removing scheduledjob")
|
||||||
|
err = deleteScheduledJob(f.Client, f.Namespace.Name, scheduledJob.Name)
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
})
|
||||||
|
|
||||||
|
// suspended should not schedule jobs
|
||||||
|
It("should not schedule jobs when suspended", func() {
|
||||||
|
By("Creating a suspended scheduledjob")
|
||||||
|
scheduledJob := newTestScheduledJob("suspended", "*/1 * * * ?", batch.AllowConcurrent)
|
||||||
|
scheduledJob.Spec.Suspend = newBool(true)
|
||||||
|
scheduledJob, err := createScheduledJob(f.Client, f.Namespace.Name, scheduledJob)
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
|
By("Ensuring no jobs are scheduled")
|
||||||
|
err = waitForNoJobs(f.Client, f.Namespace.Name, scheduledJob.Name)
|
||||||
|
Expect(err).To(HaveOccurred())
|
||||||
|
|
||||||
|
By("Ensuring no job exists by listing jobs explicitly")
|
||||||
|
jobs, err := f.Client.Batch().Jobs(f.Namespace.Name).List(api.ListOptions{})
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
Expect(jobs.Items).To(HaveLen(0))
|
||||||
|
|
||||||
|
By("Removing scheduledjob")
|
||||||
|
err = deleteScheduledJob(f.Client, f.Namespace.Name, scheduledJob.Name)
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
})
|
||||||
|
|
||||||
|
// only single active job is allowed for ForbidConcurrent
|
||||||
|
It("should not schedule new jobs when ForbidConcurrent", func() {
|
||||||
|
By("Creating a ForbidConcurrent scheduledjob")
|
||||||
|
scheduledJob := newTestScheduledJob("forbid", "*/1 * * * ?", batch.ForbidConcurrent)
|
||||||
|
scheduledJob, err := createScheduledJob(f.Client, f.Namespace.Name, scheduledJob)
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
|
By("Ensuring a job is scheduled")
|
||||||
|
err = waitForActiveJobs(f.Client, f.Namespace.Name, scheduledJob.Name, 1)
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
|
By("Ensuring exactly one is scheduled")
|
||||||
|
scheduledJob, err = getScheduledJob(f.Client, f.Namespace.Name, scheduledJob.Name)
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
Expect(scheduledJob.Status.Active).Should(HaveLen(1))
|
||||||
|
|
||||||
|
By("Ensuring exaclty one job exists by listing jobs explicitly")
|
||||||
|
jobs, err := f.Client.Batch().Jobs(f.Namespace.Name).List(api.ListOptions{})
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
Expect(jobs.Items).To(HaveLen(1))
|
||||||
|
|
||||||
|
By("Ensuring no more jobs are scheduled")
|
||||||
|
err = waitForActiveJobs(f.Client, f.Namespace.Name, scheduledJob.Name, 2)
|
||||||
|
Expect(err).To(HaveOccurred())
|
||||||
|
|
||||||
|
By("Removing scheduledjob")
|
||||||
|
err = deleteScheduledJob(f.Client, f.Namespace.Name, scheduledJob.Name)
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
})
|
||||||
|
|
||||||
|
// only single active job is allowed for ReplaceConcurrent
|
||||||
|
It("should replace jobs when ReplaceConcurrent", func() {
|
||||||
|
By("Creating a ReplaceConcurrent scheduledjob")
|
||||||
|
scheduledJob := newTestScheduledJob("replace", "*/1 * * * ?", batch.ReplaceConcurrent)
|
||||||
|
scheduledJob, err := createScheduledJob(f.Client, f.Namespace.Name, scheduledJob)
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
|
By("Ensuring a job is scheduled")
|
||||||
|
err = waitForActiveJobs(f.Client, f.Namespace.Name, scheduledJob.Name, 1)
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
|
By("Ensuring exactly one is scheduled")
|
||||||
|
scheduledJob, err = getScheduledJob(f.Client, f.Namespace.Name, scheduledJob.Name)
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
Expect(scheduledJob.Status.Active).Should(HaveLen(1))
|
||||||
|
|
||||||
|
By("Ensuring exaclty one job exists by listing jobs explicitly")
|
||||||
|
jobs, err := f.Client.Batch().Jobs(f.Namespace.Name).List(api.ListOptions{})
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
Expect(jobs.Items).To(HaveLen(1))
|
||||||
|
|
||||||
|
By("Ensuring the job is replaced with a new one")
|
||||||
|
err = waitForJobReplaced(f.Client, f.Namespace.Name, jobs.Items[0].Name)
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
|
By("Removing scheduledjob")
|
||||||
|
err = deleteScheduledJob(f.Client, f.Namespace.Name, scheduledJob.Name)
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
// newTestScheduledJob returns a scheduledjob which does one of several testing behaviors.
|
||||||
|
func newTestScheduledJob(name, schedule string, concurrencyPolicy batch.ConcurrencyPolicy) *batch.ScheduledJob {
|
||||||
|
parallelism := int32(1)
|
||||||
|
completions := int32(1)
|
||||||
|
return &batch.ScheduledJob{
|
||||||
|
ObjectMeta: api.ObjectMeta{
|
||||||
|
Name: name,
|
||||||
|
},
|
||||||
|
Spec: batch.ScheduledJobSpec{
|
||||||
|
Schedule: schedule,
|
||||||
|
ConcurrencyPolicy: concurrencyPolicy,
|
||||||
|
JobTemplate: batch.JobTemplateSpec{
|
||||||
|
Spec: batch.JobSpec{
|
||||||
|
Parallelism: ¶llelism,
|
||||||
|
Completions: &completions,
|
||||||
|
Template: api.PodTemplateSpec{
|
||||||
|
Spec: api.PodSpec{
|
||||||
|
RestartPolicy: api.RestartPolicyOnFailure,
|
||||||
|
Volumes: []api.Volume{
|
||||||
|
{
|
||||||
|
Name: "data",
|
||||||
|
VolumeSource: api.VolumeSource{
|
||||||
|
EmptyDir: &api.EmptyDirVolumeSource{},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Containers: []api.Container{
|
||||||
|
{
|
||||||
|
Name: "c",
|
||||||
|
Image: "gcr.io/google_containers/busybox:1.24",
|
||||||
|
Command: []string{"sleep", "300"},
|
||||||
|
VolumeMounts: []api.VolumeMount{
|
||||||
|
{
|
||||||
|
MountPath: "/data",
|
||||||
|
Name: "data",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func createScheduledJob(c *client.Client, ns string, scheduledJob *batch.ScheduledJob) (*batch.ScheduledJob, error) {
|
||||||
|
return c.Batch().ScheduledJobs(ns).Create(scheduledJob)
|
||||||
|
}
|
||||||
|
|
||||||
|
func getScheduledJob(c *client.Client, ns, name string) (*batch.ScheduledJob, error) {
|
||||||
|
return c.Batch().ScheduledJobs(ns).Get(name)
|
||||||
|
}
|
||||||
|
|
||||||
|
func deleteScheduledJob(c *client.Client, ns, name string) error {
|
||||||
|
return c.Batch().ScheduledJobs(ns).Delete(name, nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait for at least given amount of active jobs.
|
||||||
|
func waitForActiveJobs(c *client.Client, ns, scheduledJobName string, active int) error {
|
||||||
|
return wait.Poll(framework.Poll, scheduledJobTimeout, func() (bool, error) {
|
||||||
|
curr, err := c.Batch().ScheduledJobs(ns).Get(scheduledJobName)
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
return len(curr.Status.Active) >= active, nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait for no jobs to appear.
|
||||||
|
func waitForNoJobs(c *client.Client, ns, jobName string) error {
|
||||||
|
return wait.Poll(framework.Poll, scheduledJobTimeout, func() (bool, error) {
|
||||||
|
curr, err := c.Batch().ScheduledJobs(ns).Get(jobName)
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return len(curr.Status.Active) != 0, nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait for a job to be replaced with a new one.
|
||||||
|
func waitForJobReplaced(c *client.Client, ns, previousJobName string) error {
|
||||||
|
return wait.Poll(framework.Poll, scheduledJobTimeout, func() (bool, error) {
|
||||||
|
jobs, err := c.Batch().Jobs(ns).List(api.ListOptions{})
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
if len(jobs.Items) != 1 {
|
||||||
|
return false, fmt.Errorf("More than one job is running")
|
||||||
|
}
|
||||||
|
return jobs.Items[0].Name != previousJobName, nil
|
||||||
|
})
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user