Merge pull request #35420 from soltysh/sj_replace_fix

Automatic merge from submit-queue

Remove Job also from .status.active for Replace strategy

When iterating over list of Jobs we're removing each of them when strategy is replace. Unfortunately, the job reference was not removed from `.status.active` which cause the controller trying to remove it once again during next run and failed removing what was already removed during previous run. This was cause by not removing the reference previously. This PR fixes that and cleans logs a bit, in that controller.

@erictune fyi
@janetkuo ptal
This commit is contained in:
Kubernetes Submit Queue 2016-10-30 05:08:43 -07:00 committed by GitHub
commit 7d911417c2
3 changed files with 63 additions and 51 deletions

View File

@ -169,7 +169,7 @@ func SyncOne(sj batch.ScheduledJob, js []batch.Job, now time.Time, jc jobControl
return return
} }
if len(times) > 1 { if len(times) > 1 {
glog.Errorf("Multiple unmet start times for %s so only starting last one", nameForLog) glog.V(4).Infof("Multiple unmet start times for %s so only starting last one", nameForLog)
} }
scheduledTime := times[len(times)-1] scheduledTime := times[len(times)-1]
tooLate := false tooLate := false
@ -177,7 +177,7 @@ func SyncOne(sj batch.ScheduledJob, js []batch.Job, now time.Time, jc jobControl
tooLate = scheduledTime.Add(time.Second * time.Duration(*sj.Spec.StartingDeadlineSeconds)).Before(now) tooLate = scheduledTime.Add(time.Second * time.Duration(*sj.Spec.StartingDeadlineSeconds)).Before(now)
} }
if tooLate { if tooLate {
glog.Errorf("Missed starting window for %s", nameForLog) glog.V(4).Infof("Missed starting window for %s", nameForLog)
// TODO: generate an event for a miss. Use a warning level event because it indicates a // TODO: generate an event for a miss. Use a warning level event because it indicates a
// problem with the controller (restart or long queue), and is not expected by user either. // problem with the controller (restart or long queue), and is not expected by user either.
// Since we don't set LastScheduleTime when not scheduling, we are going to keep noticing // Since we don't set LastScheduleTime when not scheduling, we are going to keep noticing
@ -199,14 +199,15 @@ func SyncOne(sj batch.ScheduledJob, js []batch.Job, now time.Time, jc jobControl
// TODO: for Forbid, we could use the same name for every execution, as a lock. // TODO: for Forbid, we could use the same name for every execution, as a lock.
// With replace, we could use a name that is deterministic per execution time. // With replace, we could use a name that is deterministic per execution time.
// But that would mean that you could not inspect prior successes or failures of Forbid jobs. // But that would mean that you could not inspect prior successes or failures of Forbid jobs.
glog.V(4).Infof("Not starting job for %s because of prior execution still running and concurrency policy is Forbid.", nameForLog) glog.V(4).Infof("Not starting job for %s because of prior execution still running and concurrency policy is Forbid", nameForLog)
return return
} }
if sj.Spec.ConcurrencyPolicy == batch.ReplaceConcurrent { if sj.Spec.ConcurrencyPolicy == batch.ReplaceConcurrent {
for _, j := range sj.Status.Active { for i := range sj.Status.Active {
j := sj.Status.Active[i]
// TODO: this should be replaced with server side job deletion // TODO: this should be replaced with server side job deletion
// currently this mimics JobReaper from pkg/kubectl/stop.go // currently this mimics JobReaper from pkg/kubectl/stop.go
glog.V(4).Infof("Deleting job %s of %s s that was still running at next scheduled start time", j.Name, nameForLog) glog.V(4).Infof("Deleting job %s of %s that was still running at next scheduled start time", j.Name, nameForLog)
job, err := jc.GetJob(j.Namespace, j.Name) job, err := jc.GetJob(j.Namespace, j.Name)
if err != nil { if err != nil {
recorder.Eventf(&sj, api.EventTypeWarning, "FailedGet", "Get job: %v", err) recorder.Eventf(&sj, api.EventTypeWarning, "FailedGet", "Get job: %v", err)
@ -242,11 +243,14 @@ func SyncOne(sj batch.ScheduledJob, js []batch.Job, now time.Time, jc jobControl
recorder.Eventf(&sj, api.EventTypeWarning, "FailedDelete", "Deleted job-pods: %v", utilerrors.NewAggregate(errList)) recorder.Eventf(&sj, api.EventTypeWarning, "FailedDelete", "Deleted job-pods: %v", utilerrors.NewAggregate(errList))
return return
} }
// ... and the job itself // ... the job itself...
if err := jc.DeleteJob(job.Namespace, job.Name); err != nil { if err := jc.DeleteJob(job.Namespace, job.Name); err != nil {
recorder.Eventf(&sj, api.EventTypeWarning, "FailedDelete", "Deleted job: %v", err) recorder.Eventf(&sj, api.EventTypeWarning, "FailedDelete", "Deleted job: %v", err)
glog.Errorf("Error deleting job %s from %s: %v", job.Name, nameForLog, err)
return return
} }
// ... and its reference from active list
deleteFromActiveList(&sj, job.ObjectMeta.UID)
recorder.Eventf(&sj, api.EventTypeNormal, "SuccessfulDelete", "Deleted job %v", j.Name) recorder.Eventf(&sj, api.EventTypeNormal, "SuccessfulDelete", "Deleted job %v", j.Name)
} }
} }
@ -261,6 +265,7 @@ func SyncOne(sj batch.ScheduledJob, js []batch.Job, now time.Time, jc jobControl
recorder.Eventf(&sj, api.EventTypeWarning, "FailedCreate", "Error creating job: %v", err) recorder.Eventf(&sj, api.EventTypeWarning, "FailedCreate", "Error creating job: %v", err)
return return
} }
glog.V(4).Infof("Created Job %s for %s", jobResp.Name, nameForLog)
recorder.Eventf(&sj, api.EventTypeNormal, "SuccessfulCreate", "Created job %v", jobResp.Name) recorder.Eventf(&sj, api.EventTypeNormal, "SuccessfulCreate", "Created job %v", jobResp.Name)
// ------------------------------------------------------------------ // // ------------------------------------------------------------------ //

View File

@ -79,7 +79,7 @@ func scheduledJob() batch.ScheduledJob {
Name: "myscheduledjob", Name: "myscheduledjob",
Namespace: "snazzycats", Namespace: "snazzycats",
UID: types.UID("1a2b3c"), UID: types.UID("1a2b3c"),
SelfLink: "/apis/batch/v2alpha1/namespaces/snazzycats/jobs/myscheduledjob", SelfLink: "/apis/batch/v2alpha1/namespaces/snazzycats/scheduledjobs/myscheduledjob",
CreationTimestamp: unversioned.Time{Time: justBeforeTheHour()}, CreationTimestamp: unversioned.Time{Time: justBeforeTheHour()},
}, },
Spec: batch.ScheduledJobSpec{ Spec: batch.ScheduledJobSpec{
@ -140,7 +140,6 @@ var (
) )
func TestSyncOne_RunOrNot(t *testing.T) { func TestSyncOne_RunOrNot(t *testing.T) {
testCases := map[string]struct { testCases := map[string]struct {
// sj spec // sj spec
concurrencyPolicy batch.ConcurrencyPolicy concurrencyPolicy batch.ConcurrencyPolicy
@ -158,39 +157,39 @@ func TestSyncOne_RunOrNot(t *testing.T) {
// expectations // expectations
expectCreate bool expectCreate bool
expectDelete bool expectDelete bool
expectActive int
}{ }{
"never ran, not time, A": {A, F, onTheHour, noDead, F, F, justBeforeTheHour(), F, F}, "never ran, not time, A": {A, F, onTheHour, noDead, F, F, justBeforeTheHour(), F, F, 0},
"never ran, not time, F": {f, F, onTheHour, noDead, F, F, justBeforeTheHour(), F, F}, "never ran, not time, F": {f, F, onTheHour, noDead, F, F, justBeforeTheHour(), F, F, 0},
"never ran, not time, R": {R, F, onTheHour, noDead, F, F, justBeforeTheHour(), F, F}, "never ran, not time, R": {R, F, onTheHour, noDead, F, F, justBeforeTheHour(), F, F, 0},
"never ran, is time, A": {A, F, onTheHour, noDead, F, F, justAfterTheHour(), T, F}, "never ran, is time, A": {A, F, onTheHour, noDead, F, F, justAfterTheHour(), T, F, 1},
"never ran, is time, F": {f, F, onTheHour, noDead, F, F, justAfterTheHour(), T, F}, "never ran, is time, F": {f, F, onTheHour, noDead, F, F, justAfterTheHour(), T, F, 1},
"never ran, is time, R": {R, F, onTheHour, noDead, F, F, justAfterTheHour(), T, F}, "never ran, is time, R": {R, F, onTheHour, noDead, F, F, justAfterTheHour(), T, F, 1},
"never ran, is time, suspended": {A, T, onTheHour, noDead, F, F, justAfterTheHour(), F, F}, "never ran, is time, suspended": {A, T, onTheHour, noDead, F, F, justAfterTheHour(), F, F, 0},
"never ran, is time, past deadline": {A, F, onTheHour, shortDead, F, F, justAfterTheHour(), F, F}, "never ran, is time, past deadline": {A, F, onTheHour, shortDead, F, F, justAfterTheHour(), F, F, 0},
"never ran, is time, not past deadline": {A, F, onTheHour, longDead, F, F, justAfterTheHour(), T, F}, "never ran, is time, not past deadline": {A, F, onTheHour, longDead, F, F, justAfterTheHour(), T, F, 1},
"prev ran but done, not time, A": {A, F, onTheHour, noDead, T, F, justBeforeTheHour(), F, F}, "prev ran but done, not time, A": {A, F, onTheHour, noDead, T, F, justBeforeTheHour(), F, F, 0},
"prev ran but done, not time, F": {f, F, onTheHour, noDead, T, F, justBeforeTheHour(), F, F}, "prev ran but done, not time, F": {f, F, onTheHour, noDead, T, F, justBeforeTheHour(), F, F, 0},
"prev ran but done, not time, R": {R, F, onTheHour, noDead, T, F, justBeforeTheHour(), F, F}, "prev ran but done, not time, R": {R, F, onTheHour, noDead, T, F, justBeforeTheHour(), F, F, 0},
"prev ran but done, is time, A": {A, F, onTheHour, noDead, T, F, justAfterTheHour(), T, F}, "prev ran but done, is time, A": {A, F, onTheHour, noDead, T, F, justAfterTheHour(), T, F, 1},
"prev ran but done, is time, F": {f, F, onTheHour, noDead, T, F, justAfterTheHour(), T, F}, "prev ran but done, is time, F": {f, F, onTheHour, noDead, T, F, justAfterTheHour(), T, F, 1},
"prev ran but done, is time, R": {R, F, onTheHour, noDead, T, F, justAfterTheHour(), T, F}, "prev ran but done, is time, R": {R, F, onTheHour, noDead, T, F, justAfterTheHour(), T, F, 1},
"prev ran but done, is time, suspended": {A, T, onTheHour, noDead, T, F, justAfterTheHour(), F, F}, "prev ran but done, is time, suspended": {A, T, onTheHour, noDead, T, F, justAfterTheHour(), F, F, 0},
"prev ran but done, is time, past deadline": {A, F, onTheHour, shortDead, T, F, justAfterTheHour(), F, F}, "prev ran but done, is time, past deadline": {A, F, onTheHour, shortDead, T, F, justAfterTheHour(), F, F, 0},
"prev ran but done, is time, not past deadline": {A, F, onTheHour, longDead, T, F, justAfterTheHour(), T, F}, "prev ran but done, is time, not past deadline": {A, F, onTheHour, longDead, T, F, justAfterTheHour(), T, F, 1},
"still active, not time, A": {A, F, onTheHour, noDead, T, T, justBeforeTheHour(), F, F}, "still active, not time, A": {A, F, onTheHour, noDead, T, T, justBeforeTheHour(), F, F, 1},
"still active, not time, F": {f, F, onTheHour, noDead, T, T, justBeforeTheHour(), F, F}, "still active, not time, F": {f, F, onTheHour, noDead, T, T, justBeforeTheHour(), F, F, 1},
"still active, not time, R": {R, F, onTheHour, noDead, T, T, justBeforeTheHour(), F, F}, "still active, not time, R": {R, F, onTheHour, noDead, T, T, justBeforeTheHour(), F, F, 1},
"still active, is time, A": {A, F, onTheHour, noDead, T, T, justAfterTheHour(), T, F}, "still active, is time, A": {A, F, onTheHour, noDead, T, T, justAfterTheHour(), T, F, 2},
"still active, is time, F": {f, F, onTheHour, noDead, T, T, justAfterTheHour(), F, F}, "still active, is time, F": {f, F, onTheHour, noDead, T, T, justAfterTheHour(), F, F, 1},
"still active, is time, R": {R, F, onTheHour, noDead, T, T, justAfterTheHour(), T, T}, "still active, is time, R": {R, F, onTheHour, noDead, T, T, justAfterTheHour(), T, T, 1},
"still active, is time, suspended": {A, T, onTheHour, noDead, T, T, justAfterTheHour(), F, F}, "still active, is time, suspended": {A, T, onTheHour, noDead, T, T, justAfterTheHour(), F, F, 1},
"still active, is time, past deadline": {A, F, onTheHour, shortDead, T, T, justAfterTheHour(), F, F}, "still active, is time, past deadline": {A, F, onTheHour, shortDead, T, T, justAfterTheHour(), F, F, 1},
"still active, is time, not past deadline": {A, F, onTheHour, longDead, T, T, justAfterTheHour(), T, F}, "still active, is time, not past deadline": {A, F, onTheHour, longDead, T, T, justAfterTheHour(), T, F, 2},
} }
for name, tc := range testCases { for name, tc := range testCases {
t.Log("Test case:", name)
sj := scheduledJob() sj := scheduledJob()
sj.Spec.ConcurrencyPolicy = tc.concurrencyPolicy sj.Spec.ConcurrencyPolicy = tc.concurrencyPolicy
sj.Spec.Suspend = &tc.suspend sj.Spec.Suspend = &tc.suspend
@ -209,7 +208,7 @@ func TestSyncOne_RunOrNot(t *testing.T) {
sj.Status.LastScheduleTime = &unversioned.Time{Time: justAfterThePriorHour()} sj.Status.LastScheduleTime = &unversioned.Time{Time: justAfterThePriorHour()}
job, err = getJobFromTemplate(&sj, sj.Status.LastScheduleTime.Time) job, err = getJobFromTemplate(&sj, sj.Status.LastScheduleTime.Time)
if err != nil { if err != nil {
t.Fatalf("Unexpected error creating a job from template: %v", err) t.Fatalf("%s: nexpected error creating a job from template: %v", name, err)
} }
job.UID = "1234" job.UID = "1234"
job.Namespace = "" job.Namespace = ""
@ -220,7 +219,7 @@ func TestSyncOne_RunOrNot(t *testing.T) {
} else { } else {
sj.ObjectMeta.CreationTimestamp = unversioned.Time{Time: justBeforeTheHour()} sj.ObjectMeta.CreationTimestamp = unversioned.Time{Time: justBeforeTheHour()}
if tc.stillActive { if tc.stillActive {
t.Errorf("Test setup error: this case makes no sense.") t.Errorf("%s: test setup error: this case makes no sense", name)
} }
} }
@ -235,7 +234,7 @@ func TestSyncOne_RunOrNot(t *testing.T) {
expectedCreates = 1 expectedCreates = 1
} }
if len(jc.Jobs) != expectedCreates { if len(jc.Jobs) != expectedCreates {
t.Errorf("Expected %d job started, actually %v", expectedCreates, len(jc.Jobs)) t.Errorf("%s: expected %d job started, actually %v", name, expectedCreates, len(jc.Jobs))
} }
expectedDeletes := 0 expectedDeletes := 0
@ -243,18 +242,25 @@ func TestSyncOne_RunOrNot(t *testing.T) {
expectedDeletes = 1 expectedDeletes = 1
} }
if len(jc.DeleteJobName) != expectedDeletes { if len(jc.DeleteJobName) != expectedDeletes {
t.Errorf("Expected %d job deleted, actually %v", expectedDeletes, len(jc.DeleteJobName)) t.Errorf("%s: expected %d job deleted, actually %v", name, expectedDeletes, len(jc.DeleteJobName))
} }
// Status update happens once when ranging through job list, and another one if create jobs.
expectUpdates := 1
expectedEvents := 0 expectedEvents := 0
if tc.expectCreate { if tc.expectCreate {
expectedEvents += 1 expectedEvents++
expectUpdates++
} }
if tc.expectDelete { if tc.expectDelete {
expectedEvents += 1 expectedEvents++
} }
if len(recorder.Events) != expectedEvents { if len(recorder.Events) != expectedEvents {
t.Errorf("Expected %d event, actually %v", expectedEvents, len(recorder.Events)) t.Errorf("%s: expected %d event, actually %v", name, expectedEvents, len(recorder.Events))
}
if tc.expectActive != len(sjc.Updates[expectUpdates-1].Status.Active) {
t.Errorf("%s: expected Active size %d, got %d", name, tc.expectActive, len(sjc.Updates[expectUpdates-1].Status.Active))
} }
} }
} }
@ -331,7 +337,6 @@ func TestSyncOne_Status(t *testing.T) {
} }
for name, tc := range testCases { for name, tc := range testCases {
t.Log("Test case:", name)
// Setup the test // Setup the test
sj := scheduledJob() sj := scheduledJob()
sj.Spec.ConcurrencyPolicy = tc.concurrencyPolicy sj.Spec.ConcurrencyPolicy = tc.concurrencyPolicy
@ -345,7 +350,7 @@ func TestSyncOne_Status(t *testing.T) {
sj.Status.LastScheduleTime = &unversioned.Time{Time: justAfterThePriorHour()} sj.Status.LastScheduleTime = &unversioned.Time{Time: justAfterThePriorHour()}
} else { } else {
if tc.hasFinishedJob || tc.hasUnexpectedJob { if tc.hasFinishedJob || tc.hasUnexpectedJob {
t.Errorf("Test setup error: this case makes no sense.") t.Errorf("%s: test setup error: this case makes no sense", name)
} }
sj.ObjectMeta.CreationTimestamp = unversioned.Time{Time: justBeforeTheHour()} sj.ObjectMeta.CreationTimestamp = unversioned.Time{Time: justBeforeTheHour()}
} }
@ -353,7 +358,7 @@ func TestSyncOne_Status(t *testing.T) {
if tc.hasFinishedJob { if tc.hasFinishedJob {
ref, err := getRef(&finishedJob) ref, err := getRef(&finishedJob)
if err != nil { if err != nil {
t.Errorf("Test setup error: failed to get job's ref: %v.", err) t.Errorf("%s: test setup error: failed to get job's ref: %v.", name, err)
} }
sj.Status.Active = []api.ObjectReference{*ref} sj.Status.Active = []api.ObjectReference{*ref}
jobs = append(jobs, finishedJob) jobs = append(jobs, finishedJob)
@ -389,23 +394,23 @@ func TestSyncOne_Status(t *testing.T) {
} }
if len(recorder.Events) != expectedEvents { if len(recorder.Events) != expectedEvents {
t.Errorf("Expected %d event, actually %v: %#v", expectedEvents, len(recorder.Events), recorder.Events) t.Errorf("%s: expected %d event, actually %v: %#v", name, expectedEvents, len(recorder.Events), recorder.Events)
} }
if expectUpdates != len(sjc.Updates) { if expectUpdates != len(sjc.Updates) {
t.Errorf("expected %d status updates, actually %d", expectUpdates, len(sjc.Updates)) t.Errorf("%s: expected %d status updates, actually %d", name, expectUpdates, len(sjc.Updates))
} }
if tc.hasFinishedJob && inActiveList(sjc.Updates[0], finishedJob.UID) { if tc.hasFinishedJob && inActiveList(sjc.Updates[0], finishedJob.UID) {
t.Errorf("Expected finished job removed from active list, actually active list = %#v.", sjc.Updates[0].Status.Active) t.Errorf("%s: expected finished job removed from active list, actually active list = %#v", name, sjc.Updates[0].Status.Active)
} }
if tc.hasUnexpectedJob && inActiveList(sjc.Updates[0], unexpectedJob.UID) { if tc.hasUnexpectedJob && inActiveList(sjc.Updates[0], unexpectedJob.UID) {
t.Errorf("Expected unexpected job not added to active list, actually active list = %#v.", sjc.Updates[0].Status.Active) t.Errorf("%s: expected unexpected job not added to active list, actually active list = %#v", name, sjc.Updates[0].Status.Active)
} }
if tc.expectCreate && !sjc.Updates[1].Status.LastScheduleTime.Time.Equal(topOfTheHour()) { if tc.expectCreate && !sjc.Updates[1].Status.LastScheduleTime.Time.Equal(topOfTheHour()) {
t.Errorf("Expected LastScheduleTime updated to %s, got %s.", topOfTheHour(), sjc.Updates[1].Status.LastScheduleTime) t.Errorf("%s: expected LastScheduleTime updated to %s, got %s", name, topOfTheHour(), sjc.Updates[1].Status.LastScheduleTime)
} }
} }
} }

View File

@ -17,6 +17,7 @@ limitations under the License.
package scheduledjob package scheduledjob
import ( import (
"fmt"
"sync" "sync"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
@ -127,6 +128,7 @@ func (f *fakeJobControl) CreateJob(namespace string, job *batch.Job) (*batch.Job
if f.Err != nil { if f.Err != nil {
return nil, f.Err return nil, f.Err
} }
job.SelfLink = fmt.Sprintf("/api/batch/v1/namespaces/%s/jobs/%s", namespace, job.Name)
f.Jobs = append(f.Jobs, *job) f.Jobs = append(f.Jobs, *job)
job.UID = "test-uid" job.UID = "test-uid"
return job, nil return job, nil