Remove jobs that do not exist from active list of CronJob

This commit is contained in:
peay 2016-12-23 15:16:52 +00:00
parent 5f0ece92de
commit d8d69d1a36
3 changed files with 142 additions and 52 deletions

View File

@ -42,6 +42,7 @@ import (
v1core "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/typed/core/v1"
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/types"
utilerrors "k8s.io/kubernetes/pkg/util/errors"
"k8s.io/kubernetes/pkg/util/metrics"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
@ -127,8 +128,10 @@ func (jm *CronJobController) SyncAll() {
func SyncOne(sj batch.CronJob, js []batch.Job, now time.Time, jc jobControlInterface, sjc sjControlInterface, pc podControlInterface, recorder record.EventRecorder) {
nameForLog := fmt.Sprintf("%s/%s", sj.Namespace, sj.Name)
childrenJobs := make(map[types.UID]bool)
for i := range js {
j := js[i]
childrenJobs[j.ObjectMeta.UID] = true
found := inActiveList(sj, j.ObjectMeta.UID)
if !found && !IsJobFinished(&j) {
recorder.Eventf(&sj, v1.EventTypeWarning, "UnexpectedJob", "Saw a job that the controller did not create or forgot: %v", j.Name)
@ -148,6 +151,17 @@ func SyncOne(sj batch.CronJob, js []batch.Job, now time.Time, jc jobControlInter
recorder.Eventf(&sj, v1.EventTypeNormal, "SawCompletedJob", "Saw completed job: %v", j.Name)
}
}
// Remove any job reference from the active list if the corresponding job does not exist any more.
// Otherwise, the cronjob may be stuck in active mode forever even though there is no matching
// job running.
for _, j := range sj.Status.Active {
if found := childrenJobs[j.UID]; !found {
recorder.Eventf(&sj, v1.EventTypeNormal, "MissingJob", "Active job went missing: %v", j.Name)
deleteFromActiveList(&sj, j.UID)
}
}
updatedSJ, err := sjc.UpdateStatus(&sj)
if err != nil {
glog.Errorf("Unable to update status for %s (rv = %s): %v", nameForLog, sj.ResourceVersion, err)

View File

@ -273,6 +273,7 @@ func TestSyncOne_Status(t *testing.T) {
finishedJob := newJob("1")
finishedJob.Status.Conditions = append(finishedJob.Status.Conditions, batch.JobCondition{Type: batch.JobComplete, Status: v1.ConditionTrue})
unexpectedJob := newJob("2")
missingJob := newJob("3")
testCases := map[string]struct {
// sj spec
@ -288,52 +289,59 @@ func TestSyncOne_Status(t *testing.T) {
// environment
now time.Time
hasUnexpectedJob bool
hasMissingJob bool
// expectations
expectCreate bool
expectDelete bool
}{
"never ran, not time, A": {A, F, onTheHour, noDead, F, F, justBeforeTheHour(), F, F, F},
"never ran, not time, F": {f, F, onTheHour, noDead, F, F, justBeforeTheHour(), F, F, F},
"never ran, not time, R": {R, F, onTheHour, noDead, F, F, justBeforeTheHour(), F, F, F},
"never ran, is time, A": {A, F, onTheHour, noDead, F, F, justAfterTheHour(), F, T, F},
"never ran, is time, F": {f, F, onTheHour, noDead, F, F, justAfterTheHour(), F, T, F},
"never ran, is time, R": {R, F, onTheHour, noDead, F, F, justAfterTheHour(), F, T, F},
"never ran, is time, suspended": {A, T, onTheHour, noDead, F, F, justAfterTheHour(), F, F, F},
"never ran, is time, past deadline": {A, F, onTheHour, shortDead, F, F, justAfterTheHour(), F, F, F},
"never ran, is time, not past deadline": {A, F, onTheHour, longDead, F, F, justAfterTheHour(), F, T, F},
"never ran, not time, A": {A, F, onTheHour, noDead, F, F, justBeforeTheHour(), F, F, F, F},
"never ran, not time, F": {f, F, onTheHour, noDead, F, F, justBeforeTheHour(), F, F, F, F},
"never ran, not time, R": {R, F, onTheHour, noDead, F, F, justBeforeTheHour(), F, F, F, F},
"never ran, is time, A": {A, F, onTheHour, noDead, F, F, justAfterTheHour(), F, F, T, F},
"never ran, is time, F": {f, F, onTheHour, noDead, F, F, justAfterTheHour(), F, F, T, F},
"never ran, is time, R": {R, F, onTheHour, noDead, F, F, justAfterTheHour(), F, F, T, F},
"never ran, is time, suspended": {A, T, onTheHour, noDead, F, F, justAfterTheHour(), F, F, F, F},
"never ran, is time, past deadline": {A, F, onTheHour, shortDead, F, F, justAfterTheHour(), F, F, F, F},
"never ran, is time, not past deadline": {A, F, onTheHour, longDead, F, F, justAfterTheHour(), F, F, T, F},
"prev ran but done, not time, A": {A, F, onTheHour, noDead, T, F, justBeforeTheHour(), F, F, F},
"prev ran but done, not time, finished job, A": {A, F, onTheHour, noDead, T, T, justBeforeTheHour(), F, F, F},
"prev ran but done, not time, unexpected job, A": {A, F, onTheHour, noDead, T, F, justBeforeTheHour(), T, F, F},
"prev ran but done, not time, finished job, unexpected job, A": {A, F, onTheHour, noDead, T, T, justBeforeTheHour(), T, F, F},
"prev ran but done, not time, finished job, F": {f, F, onTheHour, noDead, T, T, justBeforeTheHour(), F, F, F},
"prev ran but done, not time, unexpected job, R": {R, F, onTheHour, noDead, T, F, justBeforeTheHour(), T, F, F},
"prev ran but done, not time, A": {A, F, onTheHour, noDead, T, F, justBeforeTheHour(), F, F, F, F},
"prev ran but done, not time, finished job, A": {A, F, onTheHour, noDead, T, T, justBeforeTheHour(), F, F, F, F},
"prev ran but done, not time, unexpected job, A": {A, F, onTheHour, noDead, T, F, justBeforeTheHour(), T, F, F, F},
"prev ran but done, not time, missing job, A": {A, F, onTheHour, noDead, T, F, justBeforeTheHour(), F, T, F, F},
"prev ran but done, not time, missing job, unexpected job, A": {A, F, onTheHour, noDead, T, F, justBeforeTheHour(), T, T, F, F},
"prev ran but done, not time, finished job, unexpected job, A": {A, F, onTheHour, noDead, T, T, justBeforeTheHour(), T, F, F, F},
"prev ran but done, not time, finished job, missing job, A": {A, F, onTheHour, noDead, T, T, justBeforeTheHour(), F, T, F, F},
"prev ran but done, not time, finished job, missing job, unexpected job, A": {A, F, onTheHour, noDead, T, T, justBeforeTheHour(), T, T, F, F},
"prev ran but done, not time, finished job, F": {f, F, onTheHour, noDead, T, T, justBeforeTheHour(), F, F, F, F},
"prev ran but done, not time, missing job, F": {f, F, onTheHour, noDead, T, F, justBeforeTheHour(), F, T, F, F},
"prev ran but done, not time, finished job, missing job, F": {f, F, onTheHour, noDead, T, T, justBeforeTheHour(), F, T, F, F},
"prev ran but done, not time, unexpected job, R": {R, F, onTheHour, noDead, T, F, justBeforeTheHour(), T, F, F, F},
"prev ran but done, is time, A": {A, F, onTheHour, noDead, T, F, justAfterTheHour(), F, T, F},
"prev ran but done, is time, finished job, A": {A, F, onTheHour, noDead, T, T, justAfterTheHour(), F, T, F},
"prev ran but done, is time, unexpected job, A": {A, F, onTheHour, noDead, T, F, justAfterTheHour(), T, T, F},
"prev ran but done, is time, finished job, unexpected job, A": {A, F, onTheHour, noDead, T, T, justAfterTheHour(), T, T, F},
"prev ran but done, is time, F": {f, F, onTheHour, noDead, T, F, justAfterTheHour(), F, T, F},
"prev ran but done, is time, finished job, F": {f, F, onTheHour, noDead, T, T, justAfterTheHour(), F, T, F},
"prev ran but done, is time, unexpected job, F": {f, F, onTheHour, noDead, T, F, justAfterTheHour(), T, T, F},
"prev ran but done, is time, finished job, unexpected job, F": {f, F, onTheHour, noDead, T, T, justAfterTheHour(), T, T, F},
"prev ran but done, is time, R": {R, F, onTheHour, noDead, T, F, justAfterTheHour(), F, T, F},
"prev ran but done, is time, finished job, R": {R, F, onTheHour, noDead, T, T, justAfterTheHour(), F, T, F},
"prev ran but done, is time, unexpected job, R": {R, F, onTheHour, noDead, T, F, justAfterTheHour(), T, T, F},
"prev ran but done, is time, finished job, unexpected job, R": {R, F, onTheHour, noDead, T, T, justAfterTheHour(), T, T, F},
"prev ran but done, is time, suspended": {A, T, onTheHour, noDead, T, F, justAfterTheHour(), F, F, F},
"prev ran but done, is time, finished job, suspended": {A, T, onTheHour, noDead, T, T, justAfterTheHour(), F, F, F},
"prev ran but done, is time, unexpected job, suspended": {A, T, onTheHour, noDead, T, F, justAfterTheHour(), T, F, F},
"prev ran but done, is time, finished job, unexpected job, suspended": {A, T, onTheHour, noDead, T, T, justAfterTheHour(), T, F, F},
"prev ran but done, is time, past deadline": {A, F, onTheHour, shortDead, T, F, justAfterTheHour(), F, F, F},
"prev ran but done, is time, finished job, past deadline": {A, F, onTheHour, shortDead, T, T, justAfterTheHour(), F, F, F},
"prev ran but done, is time, unexpected job, past deadline": {A, F, onTheHour, shortDead, T, F, justAfterTheHour(), T, F, F},
"prev ran but done, is time, finished job, unexpected job, past deadline": {A, F, onTheHour, shortDead, T, T, justAfterTheHour(), T, F, F},
"prev ran but done, is time, not past deadline": {A, F, onTheHour, longDead, T, F, justAfterTheHour(), F, T, F},
"prev ran but done, is time, finished job, not past deadline": {A, F, onTheHour, longDead, T, T, justAfterTheHour(), F, T, F},
"prev ran but done, is time, unexpected job, not past deadline": {A, F, onTheHour, longDead, T, F, justAfterTheHour(), T, T, F},
"prev ran but done, is time, finished job, unexpected job, not past deadline": {A, F, onTheHour, longDead, T, T, justAfterTheHour(), T, T, F},
"prev ran but done, is time, A": {A, F, onTheHour, noDead, T, F, justAfterTheHour(), F, F, T, F},
"prev ran but done, is time, finished job, A": {A, F, onTheHour, noDead, T, T, justAfterTheHour(), F, F, T, F},
"prev ran but done, is time, unexpected job, A": {A, F, onTheHour, noDead, T, F, justAfterTheHour(), T, F, T, F},
"prev ran but done, is time, finished job, unexpected job, A": {A, F, onTheHour, noDead, T, T, justAfterTheHour(), T, F, T, F},
"prev ran but done, is time, F": {f, F, onTheHour, noDead, T, F, justAfterTheHour(), F, F, T, F},
"prev ran but done, is time, finished job, F": {f, F, onTheHour, noDead, T, T, justAfterTheHour(), F, F, T, F},
"prev ran but done, is time, unexpected job, F": {f, F, onTheHour, noDead, T, F, justAfterTheHour(), T, F, T, F},
"prev ran but done, is time, finished job, unexpected job, F": {f, F, onTheHour, noDead, T, T, justAfterTheHour(), T, F, T, F},
"prev ran but done, is time, R": {R, F, onTheHour, noDead, T, F, justAfterTheHour(), F, F, T, F},
"prev ran but done, is time, finished job, R": {R, F, onTheHour, noDead, T, T, justAfterTheHour(), F, F, T, F},
"prev ran but done, is time, unexpected job, R": {R, F, onTheHour, noDead, T, F, justAfterTheHour(), T, F, T, F},
"prev ran but done, is time, finished job, unexpected job, R": {R, F, onTheHour, noDead, T, T, justAfterTheHour(), T, F, T, F},
"prev ran but done, is time, suspended": {A, T, onTheHour, noDead, T, F, justAfterTheHour(), F, F, F, F},
"prev ran but done, is time, finished job, suspended": {A, T, onTheHour, noDead, T, T, justAfterTheHour(), F, F, F, F},
"prev ran but done, is time, unexpected job, suspended": {A, T, onTheHour, noDead, T, F, justAfterTheHour(), T, F, F, F},
"prev ran but done, is time, finished job, unexpected job, suspended": {A, T, onTheHour, noDead, T, T, justAfterTheHour(), T, F, F, F},
"prev ran but done, is time, past deadline": {A, F, onTheHour, shortDead, T, F, justAfterTheHour(), F, F, F, F},
"prev ran but done, is time, finished job, past deadline": {A, F, onTheHour, shortDead, T, T, justAfterTheHour(), F, F, F, F},
"prev ran but done, is time, unexpected job, past deadline": {A, F, onTheHour, shortDead, T, F, justAfterTheHour(), T, F, F, F},
"prev ran but done, is time, finished job, unexpected job, past deadline": {A, F, onTheHour, shortDead, T, T, justAfterTheHour(), T, F, F, F},
"prev ran but done, is time, not past deadline": {A, F, onTheHour, longDead, T, F, justAfterTheHour(), F, F, T, F},
"prev ran but done, is time, finished job, not past deadline": {A, F, onTheHour, longDead, T, T, justAfterTheHour(), F, F, T, F},
"prev ran but done, is time, unexpected job, not past deadline": {A, F, onTheHour, longDead, T, F, justAfterTheHour(), T, F, T, F},
"prev ran but done, is time, finished job, unexpected job, not past deadline": {A, F, onTheHour, longDead, T, T, justAfterTheHour(), T, F, T, F},
}
for name, tc := range testCases {
@ -349,7 +357,7 @@ func TestSyncOne_Status(t *testing.T) {
sj.ObjectMeta.CreationTimestamp = metav1.Time{Time: justBeforeThePriorHour()}
sj.Status.LastScheduleTime = &metav1.Time{Time: justAfterThePriorHour()}
} else {
if tc.hasFinishedJob || tc.hasUnexpectedJob {
if tc.hasFinishedJob || tc.hasUnexpectedJob || tc.hasMissingJob {
t.Errorf("%s: test setup error: this case makes no sense", name)
}
sj.ObjectMeta.CreationTimestamp = metav1.Time{Time: justBeforeTheHour()}
@ -366,6 +374,13 @@ func TestSyncOne_Status(t *testing.T) {
if tc.hasUnexpectedJob {
jobs = append(jobs, unexpectedJob)
}
if tc.hasMissingJob {
ref, err := getRef(&missingJob)
if err != nil {
t.Errorf("%s: test setup error: failed to get job's ref: %v.", name, err)
}
sj.Status.Active = append(sj.Status.Active, *ref)
}
jc := &fakeJobControl{}
sjc := &fakeSJControl{}
@ -392,6 +407,9 @@ func TestSyncOne_Status(t *testing.T) {
if tc.hasUnexpectedJob {
expectedEvents++
}
if tc.hasMissingJob {
expectedEvents++
}
if len(recorder.Events) != expectedEvents {
t.Errorf("%s: expected %d event, actually %v: %#v", name, expectedEvents, len(recorder.Events), recorder.Events)
@ -409,6 +427,10 @@ func TestSyncOne_Status(t *testing.T) {
t.Errorf("%s: expected unexpected job not added to active list, actually active list = %#v", name, sjc.Updates[0].Status.Active)
}
if tc.hasMissingJob && inActiveList(sjc.Updates[0], missingJob.UID) {
t.Errorf("%s: expected missing job to be removed from active list, actually active list = %#v", name, sjc.Updates[0].Status.Active)
}
if tc.expectCreate && !sjc.Updates[1].Status.LastScheduleTime.Time.Equal(topOfTheHour()) {
t.Errorf("%s: expected LastScheduleTime updated to %s, got %s", name, topOfTheHour(), sjc.Updates[1].Status.LastScheduleTime)
}

View File

@ -23,12 +23,16 @@ import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/v1"
batchinternal "k8s.io/kubernetes/pkg/apis/batch"
batchv1 "k8s.io/kubernetes/pkg/apis/batch/v1"
batch "k8s.io/kubernetes/pkg/apis/batch/v2alpha1"
metav1 "k8s.io/kubernetes/pkg/apis/meta/v1"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
"k8s.io/kubernetes/pkg/controller/job"
"k8s.io/kubernetes/pkg/kubectl"
"k8s.io/kubernetes/pkg/runtime/schema"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/test/e2e/framework"
@ -83,7 +87,7 @@ var _ = framework.KubeDescribe("CronJob", func() {
Expect(err).NotTo(HaveOccurred())
By("Ensuring no jobs are scheduled")
err = waitForNoJobs(f.ClientSet, f.Namespace.Name, cronJob.Name)
err = waitForNoJobs(f.ClientSet, f.Namespace.Name, cronJob.Name, false)
Expect(err).To(HaveOccurred())
By("Ensuring no job exists by listing jobs explicitly")
@ -172,13 +176,55 @@ var _ = framework.KubeDescribe("CronJob", func() {
Expect(err).NotTo(HaveOccurred())
By("Ensuring no unexpected event has happened")
err = checkNoUnexpectedEvents(f.ClientSet, f.Namespace.Name, cronJob.Name)
err = checkNoEventWithReason(f.ClientSet, f.Namespace.Name, cronJob.Name, []string{"MissingJob", "UnexpectedJob"})
Expect(err).NotTo(HaveOccurred())
By("Removing cronjob")
err = deleteCronJob(f.ClientSet, f.Namespace.Name, cronJob.Name)
Expect(err).NotTo(HaveOccurred())
})
// deleted jobs should be removed from the active list
It("should remove from active list jobs that have been deleted", func() {
By("Creating a ForbidConcurrent cronjob")
cronJob := newTestCronJob("forbid", "*/1 * * * ?", batch.ForbidConcurrent, true)
cronJob, err := createCronJob(f.ClientSet, f.Namespace.Name, cronJob)
Expect(err).NotTo(HaveOccurred())
By("Ensuring a job is scheduled")
err = waitForActiveJobs(f.ClientSet, f.Namespace.Name, cronJob.Name, 1)
Expect(err).NotTo(HaveOccurred())
By("Ensuring exactly one is scheduled")
cronJob, err = getCronJob(f.ClientSet, f.Namespace.Name, cronJob.Name)
Expect(err).NotTo(HaveOccurred())
Expect(cronJob.Status.Active).Should(HaveLen(1))
By("Deleting the job")
job := cronJob.Status.Active[0]
reaper, err := kubectl.ReaperFor(batchinternal.Kind("Job"), f.InternalClientset)
Expect(err).NotTo(HaveOccurred())
timeout := 1 * time.Minute
err = reaper.Stop(f.Namespace.Name, job.Name, timeout, api.NewDeleteOptions(0))
Expect(err).NotTo(HaveOccurred())
By("Ensuring job was deleted")
_, err = getJob(f.ClientSet, f.Namespace.Name, job.Name)
Expect(err).To(HaveOccurred())
Expect(errors.IsNotFound(err)).To(BeTrue())
By("Ensuring there are no active jobs in the cronjob")
err = waitForNoJobs(f.ClientSet, f.Namespace.Name, cronJob.Name, true)
Expect(err).NotTo(HaveOccurred())
By("Ensuring MissingJob event has occured")
err = checkNoEventWithReason(f.ClientSet, f.Namespace.Name, cronJob.Name, []string{"MissingJob"})
Expect(err).To(HaveOccurred())
By("Removing cronjob")
err = deleteCronJob(f.ClientSet, f.Namespace.Name, cronJob.Name)
Expect(err).NotTo(HaveOccurred())
})
})
// newTestCronJob returns a cronjob which does one of several testing behaviors.
@ -254,15 +300,22 @@ func waitForActiveJobs(c clientset.Interface, ns, cronJobName string, active int
})
}
// Wait for no jobs to appear.
func waitForNoJobs(c clientset.Interface, ns, jobName string) error {
// Wait for jobs to appear in the active list of a cronjob or not.
// When failIfNonEmpty is set, this fails if the active set of jobs is still non-empty after
// the timeout. When failIfNonEmpty is not set, this fails if the active set of jobs is still
// empty after the timeout.
func waitForNoJobs(c clientset.Interface, ns, jobName string, failIfNonEmpty bool) error {
return wait.Poll(framework.Poll, cronJobTimeout, func() (bool, error) {
curr, err := c.BatchV2alpha1().CronJobs(ns).Get(jobName, metav1.GetOptions{})
if err != nil {
return false, err
}
return len(curr.Status.Active) != 0, nil
if failIfNonEmpty {
return len(curr.Status.Active) == 0, nil
} else {
return len(curr.Status.Active) != 0, nil
}
})
}
@ -310,20 +363,21 @@ func waitForAnyFinishedJob(c clientset.Interface, ns string) error {
})
}
// checkNoUnexpectedEvents checks unexpected events didn't happen.
// Currently only "UnexpectedJob" is checked.
func checkNoUnexpectedEvents(c clientset.Interface, ns, cronJobName string) error {
// checkNoEventWithReason checks no events with a reason within a list has occured
func checkNoEventWithReason(c clientset.Interface, ns, cronJobName string, reasons []string) error {
sj, err := c.BatchV2alpha1().CronJobs(ns).Get(cronJobName, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("error in getting cronjob %s/%s: %v", ns, cronJobName, err)
return fmt.Errorf("Error in getting cronjob %s/%s: %v", ns, cronJobName, err)
}
events, err := c.Core().Events(ns).Search(sj)
if err != nil {
return fmt.Errorf("error in listing events: %s", err)
return fmt.Errorf("Error in listing events: %s", err)
}
for _, e := range events.Items {
if e.Reason == "UnexpectedJob" {
return fmt.Errorf("found unexpected event: %#v", e)
for _, reason := range reasons {
if e.Reason == reason {
return fmt.Errorf("Found event with reason %s: %#v", reason, e)
}
}
}
return nil