mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-28 22:17:14 +00:00
Merge pull request #48454 from weiwei04/check-job-activeDeadlineSeconds
Automatic merge from submit-queue (batch tested with PRs 44719, 48454) check job ActiveDeadlineSeconds **What this PR does / why we need it**: enqueue a sync task after ActiveDeadlineSeconds **Which issue this PR fixes** *: fixes #32149 **Special notes for your reviewer**: **Release note**: ```release-note enqueue a sync task to wake up jobcontroller to check job ActiveDeadlineSeconds in time ```
This commit is contained in:
commit
25da6e64e2
@ -82,10 +82,10 @@ func NewJobController(podInformer coreinformers.PodInformer, jobInformer batchin
|
|||||||
eventBroadcaster := record.NewBroadcaster()
|
eventBroadcaster := record.NewBroadcaster()
|
||||||
eventBroadcaster.StartLogging(glog.Infof)
|
eventBroadcaster.StartLogging(glog.Infof)
|
||||||
// TODO: remove the wrapper when every clients have moved to use the clientset.
|
// TODO: remove the wrapper when every clients have moved to use the clientset.
|
||||||
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: v1core.New(kubeClient.Core().RESTClient()).Events("")})
|
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: v1core.New(kubeClient.CoreV1().RESTClient()).Events("")})
|
||||||
|
|
||||||
if kubeClient != nil && kubeClient.Core().RESTClient().GetRateLimiter() != nil {
|
if kubeClient != nil && kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil {
|
||||||
metrics.RegisterMetricAndTrackRateLimiterUsage("job_controller", kubeClient.Core().RESTClient().GetRateLimiter())
|
metrics.RegisterMetricAndTrackRateLimiterUsage("job_controller", kubeClient.CoreV1().RESTClient().GetRateLimiter())
|
||||||
}
|
}
|
||||||
|
|
||||||
jm := &JobController{
|
jm := &JobController{
|
||||||
@ -100,12 +100,8 @@ func NewJobController(podInformer coreinformers.PodInformer, jobInformer batchin
|
|||||||
}
|
}
|
||||||
|
|
||||||
jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||||
AddFunc: jm.enqueueController,
|
AddFunc: jm.enqueueController,
|
||||||
UpdateFunc: func(old, cur interface{}) {
|
UpdateFunc: jm.updateJob,
|
||||||
if job := cur.(*batch.Job); !IsJobFinished(job) {
|
|
||||||
jm.enqueueController(job)
|
|
||||||
}
|
|
||||||
},
|
|
||||||
DeleteFunc: jm.enqueueController,
|
DeleteFunc: jm.enqueueController,
|
||||||
})
|
})
|
||||||
jm.jobLister = jobInformer.Lister()
|
jm.jobLister = jobInformer.Lister()
|
||||||
@ -306,6 +302,35 @@ func (jm *JobController) deletePod(obj interface{}) {
|
|||||||
jm.enqueueController(job)
|
jm.enqueueController(job)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (jm *JobController) updateJob(old, cur interface{}) {
|
||||||
|
oldJob := cur.(*batch.Job)
|
||||||
|
curJob := cur.(*batch.Job)
|
||||||
|
|
||||||
|
// never return error
|
||||||
|
key, err := controller.KeyFunc(curJob)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
jm.queue.Add(key)
|
||||||
|
// check if need to add a new rsync for ActiveDeadlineSeconds
|
||||||
|
if curJob.Status.StartTime != nil {
|
||||||
|
curADS := curJob.Spec.ActiveDeadlineSeconds
|
||||||
|
if curADS == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
oldADS := oldJob.Spec.ActiveDeadlineSeconds
|
||||||
|
if oldADS == nil || *oldADS != *curADS {
|
||||||
|
now := metav1.Now()
|
||||||
|
start := curJob.Status.StartTime.Time
|
||||||
|
passed := now.Time.Sub(start)
|
||||||
|
total := time.Duration(*curADS) * time.Second
|
||||||
|
// AddAfter will handle total < passed
|
||||||
|
jm.queue.AddAfter(key, total-passed)
|
||||||
|
glog.V(4).Infof("job ActiveDeadlineSeconds updated, will rsync after %d seconds", total-passed)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// obj could be an *batch.Job, or a DeletionFinalStateUnknown marker item.
|
// obj could be an *batch.Job, or a DeletionFinalStateUnknown marker item.
|
||||||
func (jm *JobController) enqueueController(obj interface{}) {
|
func (jm *JobController) enqueueController(obj interface{}) {
|
||||||
key, err := controller.KeyFunc(obj)
|
key, err := controller.KeyFunc(obj)
|
||||||
@ -420,9 +445,16 @@ func (jm *JobController) syncJob(key string) error {
|
|||||||
active := int32(len(activePods))
|
active := int32(len(activePods))
|
||||||
succeeded, failed := getStatus(pods)
|
succeeded, failed := getStatus(pods)
|
||||||
conditions := len(job.Status.Conditions)
|
conditions := len(job.Status.Conditions)
|
||||||
|
// job first start
|
||||||
if job.Status.StartTime == nil {
|
if job.Status.StartTime == nil {
|
||||||
now := metav1.Now()
|
now := metav1.Now()
|
||||||
job.Status.StartTime = &now
|
job.Status.StartTime = &now
|
||||||
|
// enqueue a sync to check if job past ActiveDeadlineSeconds
|
||||||
|
if job.Spec.ActiveDeadlineSeconds != nil {
|
||||||
|
glog.V(4).Infof("Job %s have ActiveDeadlineSeconds will sync after %d seconds",
|
||||||
|
key, *job.Spec.ActiveDeadlineSeconds)
|
||||||
|
jm.queue.AddAfter(key, time.Duration(*job.Spec.ActiveDeadlineSeconds)*time.Second)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
// if job was finished previously, we don't want to redo the termination
|
// if job was finished previously, we don't want to redo the termination
|
||||||
if IsJobFinished(&job) {
|
if IsJobFinished(&job) {
|
||||||
|
@ -39,7 +39,7 @@ var _ = SIGDescribe("Job", func() {
|
|||||||
// Simplest case: all pods succeed promptly
|
// Simplest case: all pods succeed promptly
|
||||||
It("should run a job to completion when tasks succeed", func() {
|
It("should run a job to completion when tasks succeed", func() {
|
||||||
By("Creating a job")
|
By("Creating a job")
|
||||||
job := framework.NewTestJob("succeed", "all-succeed", v1.RestartPolicyNever, parallelism, completions)
|
job := framework.NewTestJob("succeed", "all-succeed", v1.RestartPolicyNever, parallelism, completions, nil)
|
||||||
job, err := framework.CreateJob(f.ClientSet, f.Namespace.Name, job)
|
job, err := framework.CreateJob(f.ClientSet, f.Namespace.Name, job)
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
@ -58,7 +58,7 @@ var _ = SIGDescribe("Job", func() {
|
|||||||
// up to 5 minutes between restarts, making test timeouts
|
// up to 5 minutes between restarts, making test timeouts
|
||||||
// due to successive failures too likely with a reasonable
|
// due to successive failures too likely with a reasonable
|
||||||
// test timeout.
|
// test timeout.
|
||||||
job := framework.NewTestJob("failOnce", "fail-once-local", v1.RestartPolicyOnFailure, parallelism, completions)
|
job := framework.NewTestJob("failOnce", "fail-once-local", v1.RestartPolicyOnFailure, parallelism, completions, nil)
|
||||||
job, err := framework.CreateJob(f.ClientSet, f.Namespace.Name, job)
|
job, err := framework.CreateJob(f.ClientSet, f.Namespace.Name, job)
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
@ -76,7 +76,7 @@ var _ = SIGDescribe("Job", func() {
|
|||||||
// Worst case analysis: 15 failures, each taking 1 minute to
|
// Worst case analysis: 15 failures, each taking 1 minute to
|
||||||
// run due to some slowness, 1 in 2^15 chance of happening,
|
// run due to some slowness, 1 in 2^15 chance of happening,
|
||||||
// causing test flake. Should be very rare.
|
// causing test flake. Should be very rare.
|
||||||
job := framework.NewTestJob("randomlySucceedOrFail", "rand-non-local", v1.RestartPolicyNever, parallelism, completions)
|
job := framework.NewTestJob("randomlySucceedOrFail", "rand-non-local", v1.RestartPolicyNever, parallelism, completions, nil)
|
||||||
job, err := framework.CreateJob(f.ClientSet, f.Namespace.Name, job)
|
job, err := framework.CreateJob(f.ClientSet, f.Namespace.Name, job)
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
@ -85,9 +85,20 @@ var _ = SIGDescribe("Job", func() {
|
|||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
})
|
})
|
||||||
|
|
||||||
|
It("should exceed active deadline", func() {
|
||||||
|
By("Creating a job")
|
||||||
|
var activeDeadlineSeconds int64 = 1
|
||||||
|
job := framework.NewTestJob("notTerminate", "exceed-active-deadline", v1.RestartPolicyNever, parallelism, completions, &activeDeadlineSeconds)
|
||||||
|
job, err := framework.CreateJob(f.ClientSet, f.Namespace.Name, job)
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
By("Ensuring job past active deadline")
|
||||||
|
err = framework.WaitForJobFailure(f.ClientSet, f.Namespace.Name, job.Name, time.Duration(activeDeadlineSeconds+10)*time.Second, "DeadlineExceeded")
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
})
|
||||||
|
|
||||||
It("should delete a job", func() {
|
It("should delete a job", func() {
|
||||||
By("Creating a job")
|
By("Creating a job")
|
||||||
job := framework.NewTestJob("notTerminate", "foo", v1.RestartPolicyNever, parallelism, completions)
|
job := framework.NewTestJob("notTerminate", "foo", v1.RestartPolicyNever, parallelism, completions, nil)
|
||||||
job, err := framework.CreateJob(f.ClientSet, f.Namespace.Name, job)
|
job, err := framework.CreateJob(f.ClientSet, f.Namespace.Name, job)
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
@ -110,7 +121,7 @@ var _ = SIGDescribe("Job", func() {
|
|||||||
|
|
||||||
It("should adopt matching orphans and release non-matching pods", func() {
|
It("should adopt matching orphans and release non-matching pods", func() {
|
||||||
By("Creating a job")
|
By("Creating a job")
|
||||||
job := framework.NewTestJob("notTerminate", "adopt-release", v1.RestartPolicyNever, parallelism, completions)
|
job := framework.NewTestJob("notTerminate", "adopt-release", v1.RestartPolicyNever, parallelism, completions, nil)
|
||||||
// Replace job with the one returned from Create() so it has the UID.
|
// Replace job with the one returned from Create() so it has the UID.
|
||||||
// Save Kind since it won't be populated in the returned job.
|
// Save Kind since it won't be populated in the returned job.
|
||||||
kind := job.Kind
|
kind := job.Kind
|
||||||
|
@ -43,7 +43,7 @@ const (
|
|||||||
// first time it is run and succeeds subsequently. name is the Name of the Job. RestartPolicy indicates the restart
|
// first time it is run and succeeds subsequently. name is the Name of the Job. RestartPolicy indicates the restart
|
||||||
// policy of the containers in which the Pod is running. Parallelism is the Job's parallelism, and completions is the
|
// policy of the containers in which the Pod is running. Parallelism is the Job's parallelism, and completions is the
|
||||||
// Job's required number of completions.
|
// Job's required number of completions.
|
||||||
func NewTestJob(behavior, name string, rPol v1.RestartPolicy, parallelism, completions int32) *batch.Job {
|
func NewTestJob(behavior, name string, rPol v1.RestartPolicy, parallelism, completions int32, activeDeadlineSeconds *int64) *batch.Job {
|
||||||
job := &batch.Job{
|
job := &batch.Job{
|
||||||
ObjectMeta: metav1.ObjectMeta{
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
Name: name,
|
Name: name,
|
||||||
@ -52,9 +52,10 @@ func NewTestJob(behavior, name string, rPol v1.RestartPolicy, parallelism, compl
|
|||||||
Kind: "Job",
|
Kind: "Job",
|
||||||
},
|
},
|
||||||
Spec: batch.JobSpec{
|
Spec: batch.JobSpec{
|
||||||
Parallelism: ¶llelism,
|
ActiveDeadlineSeconds: activeDeadlineSeconds,
|
||||||
Completions: &completions,
|
Parallelism: ¶llelism,
|
||||||
ManualSelector: newBool(false),
|
Completions: &completions,
|
||||||
|
ManualSelector: newBool(false),
|
||||||
Template: v1.PodTemplateSpec{
|
Template: v1.PodTemplateSpec{
|
||||||
ObjectMeta: metav1.ObjectMeta{
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
Labels: map[string]string{JobSelectorKey: name},
|
Labels: map[string]string{JobSelectorKey: name},
|
||||||
@ -191,7 +192,7 @@ func WaitForJobFinish(c clientset.Interface, ns, jobName string, completions int
|
|||||||
}
|
}
|
||||||
|
|
||||||
// WaitForJobFailure uses c to wait for up to timeout for the Job named jobName in namespace ns to fail.
|
// WaitForJobFailure uses c to wait for up to timeout for the Job named jobName in namespace ns to fail.
|
||||||
func WaitForJobFailure(c clientset.Interface, ns, jobName string, timeout time.Duration) error {
|
func WaitForJobFailure(c clientset.Interface, ns, jobName string, timeout time.Duration, reason string) error {
|
||||||
return wait.Poll(Poll, timeout, func() (bool, error) {
|
return wait.Poll(Poll, timeout, func() (bool, error) {
|
||||||
curr, err := c.Batch().Jobs(ns).Get(jobName, metav1.GetOptions{})
|
curr, err := c.Batch().Jobs(ns).Get(jobName, metav1.GetOptions{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -199,7 +200,9 @@ func WaitForJobFailure(c clientset.Interface, ns, jobName string, timeout time.D
|
|||||||
}
|
}
|
||||||
for _, c := range curr.Status.Conditions {
|
for _, c := range curr.Status.Conditions {
|
||||||
if c.Type == batch.JobFailed && c.Status == v1.ConditionTrue {
|
if c.Type == batch.JobFailed && c.Status == v1.ConditionTrue {
|
||||||
return true, nil
|
if reason == "" || reason == c.Reason {
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return false, nil
|
return false, nil
|
||||||
|
@ -420,7 +420,7 @@ var _ = framework.KubeDescribe("[sig-apps] Network Partition [Disruptive] [Slow]
|
|||||||
completions := int32(4)
|
completions := int32(4)
|
||||||
|
|
||||||
job := framework.NewTestJob("notTerminate", "network-partition", v1.RestartPolicyNever,
|
job := framework.NewTestJob("notTerminate", "network-partition", v1.RestartPolicyNever,
|
||||||
parallelism, completions)
|
parallelism, completions, nil)
|
||||||
job, err := framework.CreateJob(f.ClientSet, f.Namespace.Name, job)
|
job, err := framework.CreateJob(f.ClientSet, f.Namespace.Name, job)
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
label := labels.SelectorFromSet(labels.Set(map[string]string{framework.JobSelectorKey: job.Name}))
|
label := labels.SelectorFromSet(labels.Set(map[string]string{framework.JobSelectorKey: job.Name}))
|
||||||
|
@ -39,7 +39,7 @@ func (t *JobUpgradeTest) Setup(f *framework.Framework) {
|
|||||||
t.namespace = f.Namespace.Name
|
t.namespace = f.Namespace.Name
|
||||||
|
|
||||||
By("Creating a job")
|
By("Creating a job")
|
||||||
t.job = framework.NewTestJob("notTerminate", "foo", v1.RestartPolicyOnFailure, 2, 2)
|
t.job = framework.NewTestJob("notTerminate", "foo", v1.RestartPolicyOnFailure, 2, 2, nil)
|
||||||
job, err := framework.CreateJob(f.ClientSet, t.namespace, t.job)
|
job, err := framework.CreateJob(f.ClientSet, t.namespace, t.job)
|
||||||
t.job = job
|
t.job = job
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
Loading…
Reference in New Issue
Block a user