mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-06 18:54:06 +00:00
Do not error in Job controller sync when there are pod failures
This commit is contained in:
parent
c0147ff528
commit
784a309b91
@ -905,12 +905,6 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) {
|
|||||||
return fmt.Errorf("tracking status: %w", err)
|
return fmt.Errorf("tracking status: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
jobFinished := IsJobFinished(&job)
|
|
||||||
if jobHasNewFailure && !jobFinished {
|
|
||||||
// returning an error will re-enqueue Job after the backoff period
|
|
||||||
return fmt.Errorf("failed pod(s) detected for job key %q", key)
|
|
||||||
}
|
|
||||||
|
|
||||||
return manageJobErr
|
return manageJobErr
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -854,10 +854,6 @@ func TestControllerSyncJob(t *testing.T) {
|
|||||||
if err == nil {
|
if err == nil {
|
||||||
t.Error("Syncing jobs expected to return error on podControl exception")
|
t.Error("Syncing jobs expected to return error on podControl exception")
|
||||||
}
|
}
|
||||||
} else if tc.expectedCondition == nil && (hasValidFailingPods(tc.podsWithIndexes, int(tc.completions)) || (tc.completionMode != batch.IndexedCompletion && tc.failedPods > 0)) {
|
|
||||||
if err == nil {
|
|
||||||
t.Error("Syncing jobs expected to return error when there are new failed pods and Job didn't finish")
|
|
||||||
}
|
|
||||||
} else if tc.podLimit != 0 && fakePodControl.CreateCallCount > tc.podLimit {
|
} else if tc.podLimit != 0 && fakePodControl.CreateCallCount > tc.podLimit {
|
||||||
if err == nil {
|
if err == nil {
|
||||||
t.Error("Syncing jobs expected to return error when reached the podControl limit")
|
t.Error("Syncing jobs expected to return error when reached the podControl limit")
|
||||||
@ -1704,7 +1700,6 @@ func TestSyncJobPastDeadline(t *testing.T) {
|
|||||||
failedPods int
|
failedPods int
|
||||||
|
|
||||||
// expectations
|
// expectations
|
||||||
expectedForGetKey bool
|
|
||||||
expectedDeletions int32
|
expectedDeletions int32
|
||||||
expectedActive int32
|
expectedActive int32
|
||||||
expectedSucceeded int32
|
expectedSucceeded int32
|
||||||
@ -1719,7 +1714,6 @@ func TestSyncJobPastDeadline(t *testing.T) {
|
|||||||
startTime: 15,
|
startTime: 15,
|
||||||
backoffLimit: 6,
|
backoffLimit: 6,
|
||||||
activePods: 1,
|
activePods: 1,
|
||||||
expectedForGetKey: false,
|
|
||||||
expectedDeletions: 1,
|
expectedDeletions: 1,
|
||||||
expectedFailed: 1,
|
expectedFailed: 1,
|
||||||
expectedCondition: batch.JobFailed,
|
expectedCondition: batch.JobFailed,
|
||||||
@ -1733,7 +1727,6 @@ func TestSyncJobPastDeadline(t *testing.T) {
|
|||||||
backoffLimit: 6,
|
backoffLimit: 6,
|
||||||
activePods: 1,
|
activePods: 1,
|
||||||
succeededPods: 1,
|
succeededPods: 1,
|
||||||
expectedForGetKey: true,
|
|
||||||
expectedDeletions: 1,
|
expectedDeletions: 1,
|
||||||
expectedSucceeded: 1,
|
expectedSucceeded: 1,
|
||||||
expectedFailed: 1,
|
expectedFailed: 1,
|
||||||
@ -1746,7 +1739,6 @@ func TestSyncJobPastDeadline(t *testing.T) {
|
|||||||
activeDeadlineSeconds: 10,
|
activeDeadlineSeconds: 10,
|
||||||
startTime: 10,
|
startTime: 10,
|
||||||
backoffLimit: 6,
|
backoffLimit: 6,
|
||||||
expectedForGetKey: false,
|
|
||||||
expectedCondition: batch.JobFailed,
|
expectedCondition: batch.JobFailed,
|
||||||
expectedConditionReason: "DeadlineExceeded",
|
expectedConditionReason: "DeadlineExceeded",
|
||||||
},
|
},
|
||||||
@ -1756,7 +1748,6 @@ func TestSyncJobPastDeadline(t *testing.T) {
|
|||||||
activeDeadlineSeconds: 1,
|
activeDeadlineSeconds: 1,
|
||||||
startTime: 10,
|
startTime: 10,
|
||||||
failedPods: 1,
|
failedPods: 1,
|
||||||
expectedForGetKey: false,
|
|
||||||
expectedFailed: 1,
|
expectedFailed: 1,
|
||||||
expectedCondition: batch.JobFailed,
|
expectedCondition: batch.JobFailed,
|
||||||
expectedConditionReason: "BackoffLimitExceeded",
|
expectedConditionReason: "BackoffLimitExceeded",
|
||||||
@ -1768,7 +1759,6 @@ func TestSyncJobPastDeadline(t *testing.T) {
|
|||||||
activeDeadlineSeconds: 10,
|
activeDeadlineSeconds: 10,
|
||||||
startTime: 15,
|
startTime: 15,
|
||||||
backoffLimit: 6,
|
backoffLimit: 6,
|
||||||
expectedForGetKey: true,
|
|
||||||
expectedCondition: batch.JobSuspended,
|
expectedCondition: batch.JobSuspended,
|
||||||
expectedConditionReason: "JobSuspended",
|
expectedConditionReason: "JobSuspended",
|
||||||
},
|
},
|
||||||
@ -3898,80 +3888,38 @@ func bumpResourceVersion(obj metav1.Object) {
|
|||||||
obj.SetResourceVersion(strconv.FormatInt(ver+1, 10))
|
obj.SetResourceVersion(strconv.FormatInt(ver+1, 10))
|
||||||
}
|
}
|
||||||
|
|
||||||
type pods struct {
|
func TestJobApiBackoffReset(t *testing.T) {
|
||||||
pending int
|
|
||||||
active int
|
|
||||||
succeed int
|
|
||||||
failed int
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestJobBackoffReset(t *testing.T) {
|
|
||||||
_, ctx := ktesting.NewTestContext(t)
|
_, ctx := ktesting.NewTestContext(t)
|
||||||
testCases := map[string]struct {
|
|
||||||
// job setup
|
|
||||||
parallelism int32
|
|
||||||
completions int32
|
|
||||||
backoffLimit int32
|
|
||||||
|
|
||||||
// pod setup - each row is additive!
|
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
|
||||||
pods []pods
|
manager, sharedInformerFactory := newControllerFromClient(ctx, clientset, controller.NoResyncPeriodFunc)
|
||||||
}{
|
fakePodControl := controller.FakePodControl{}
|
||||||
"parallelism=1": {
|
manager.podControl = &fakePodControl
|
||||||
1, 2, 1,
|
manager.podStoreSynced = alwaysReady
|
||||||
[]pods{
|
manager.jobStoreSynced = alwaysReady
|
||||||
{0, 1, 0, 1},
|
manager.updateStatusHandler = func(ctx context.Context, job *batch.Job) (*batch.Job, error) {
|
||||||
{0, 0, 1, 0},
|
return job, nil
|
||||||
},
|
|
||||||
},
|
|
||||||
"parallelism=2 (just failure)": {
|
|
||||||
2, 2, 1,
|
|
||||||
[]pods{
|
|
||||||
{0, 2, 0, 1},
|
|
||||||
{0, 0, 1, 0},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
}
|
||||||
|
|
||||||
for name, tc := range testCases {
|
job := newJob(1, 1, 2, batch.NonIndexedCompletion)
|
||||||
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
|
key := testutil.GetKey(job, t)
|
||||||
defer func() { DefaultJobApiBackOff = 1 * time.Second }()
|
sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job)
|
||||||
DefaultJobApiBackOff = time.Duration(0) // overwrite the default value for testing
|
|
||||||
manager, sharedInformerFactory := newControllerFromClient(ctx, clientset, controller.NoResyncPeriodFunc)
|
|
||||||
fakePodControl := controller.FakePodControl{}
|
|
||||||
manager.podControl = &fakePodControl
|
|
||||||
manager.podStoreSynced = alwaysReady
|
|
||||||
manager.jobStoreSynced = alwaysReady
|
|
||||||
var actual *batch.Job
|
|
||||||
manager.updateStatusHandler = func(ctx context.Context, job *batch.Job) (*batch.Job, error) {
|
|
||||||
actual = job
|
|
||||||
return job, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// job & pods setup
|
// error returned make the key requeued
|
||||||
job := newJob(tc.parallelism, tc.completions, tc.backoffLimit, batch.NonIndexedCompletion)
|
fakePodControl.Err = errors.New("Controller error")
|
||||||
key := testutil.GetKey(job, t)
|
manager.queue.Add(key)
|
||||||
sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job)
|
manager.processNextWorkItem(context.TODO())
|
||||||
podIndexer := sharedInformerFactory.Core().V1().Pods().Informer().GetIndexer()
|
retries := manager.queue.NumRequeues(key)
|
||||||
|
if retries != 1 {
|
||||||
|
t.Fatalf("%s: expected exactly 1 retry, got %d", job.Name, retries)
|
||||||
|
}
|
||||||
|
|
||||||
setPodsStatuses(podIndexer, job, tc.pods[0].pending, tc.pods[0].active, tc.pods[0].succeed, tc.pods[0].failed, 0)
|
// the queue is emptied on success
|
||||||
manager.queue.Add(key)
|
fakePodControl.Err = nil
|
||||||
manager.processNextWorkItem(context.TODO())
|
manager.processNextWorkItem(context.TODO())
|
||||||
retries := manager.queue.NumRequeues(key)
|
retries = manager.queue.NumRequeues(key)
|
||||||
if retries != 1 {
|
if retries != 0 {
|
||||||
t.Errorf("%s: expected exactly 1 retry, got %d", name, retries)
|
t.Fatalf("%s: expected exactly 0 retries, got %d", job.Name, retries)
|
||||||
}
|
|
||||||
|
|
||||||
job = actual
|
|
||||||
sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Replace([]interface{}{actual}, actual.ResourceVersion)
|
|
||||||
setPodsStatuses(podIndexer, job, tc.pods[1].pending, tc.pods[1].active, tc.pods[1].succeed, tc.pods[1].failed, 0)
|
|
||||||
manager.processNextWorkItem(context.TODO())
|
|
||||||
retries = manager.queue.NumRequeues(key)
|
|
||||||
if retries != 0 {
|
|
||||||
t.Errorf("%s: expected exactly 0 retries, got %d", name, retries)
|
|
||||||
}
|
|
||||||
if getCondition(actual, batch.JobFailed, v1.ConditionTrue, "BackoffLimitExceeded") {
|
|
||||||
t.Errorf("%s: unexpected job failure", name)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -4066,7 +4014,6 @@ func TestJobBackoffForOnFailure(t *testing.T) {
|
|||||||
suspend bool
|
suspend bool
|
||||||
|
|
||||||
// pod setup
|
// pod setup
|
||||||
jobKeyForget bool
|
|
||||||
restartCounts []int32
|
restartCounts []int32
|
||||||
podPhase v1.PodPhase
|
podPhase v1.PodPhase
|
||||||
|
|
||||||
@ -4078,57 +4025,57 @@ func TestJobBackoffForOnFailure(t *testing.T) {
|
|||||||
expectedConditionReason string
|
expectedConditionReason string
|
||||||
}{
|
}{
|
||||||
"backoffLimit 0 should have 1 pod active": {
|
"backoffLimit 0 should have 1 pod active": {
|
||||||
1, 1, 0, false,
|
1, 1, 0,
|
||||||
false, []int32{0}, v1.PodRunning,
|
false, []int32{0}, v1.PodRunning,
|
||||||
1, 0, 0, nil, "",
|
1, 0, 0, nil, "",
|
||||||
},
|
},
|
||||||
"backoffLimit 1 with restartCount 0 should have 1 pod active": {
|
"backoffLimit 1 with restartCount 0 should have 1 pod active": {
|
||||||
1, 1, 1, false,
|
1, 1, 1,
|
||||||
false, []int32{0}, v1.PodRunning,
|
false, []int32{0}, v1.PodRunning,
|
||||||
1, 0, 0, nil, "",
|
1, 0, 0, nil, "",
|
||||||
},
|
},
|
||||||
"backoffLimit 1 with restartCount 1 and podRunning should have 0 pod active": {
|
"backoffLimit 1 with restartCount 1 and podRunning should have 0 pod active": {
|
||||||
1, 1, 1, false,
|
1, 1, 1,
|
||||||
false, []int32{1}, v1.PodRunning,
|
false, []int32{1}, v1.PodRunning,
|
||||||
0, 0, 1, &jobConditionFailed, "BackoffLimitExceeded",
|
0, 0, 1, &jobConditionFailed, "BackoffLimitExceeded",
|
||||||
},
|
},
|
||||||
"backoffLimit 1 with restartCount 1 and podPending should have 0 pod active": {
|
"backoffLimit 1 with restartCount 1 and podPending should have 0 pod active": {
|
||||||
1, 1, 1, false,
|
1, 1, 1,
|
||||||
false, []int32{1}, v1.PodPending,
|
false, []int32{1}, v1.PodPending,
|
||||||
0, 0, 1, &jobConditionFailed, "BackoffLimitExceeded",
|
0, 0, 1, &jobConditionFailed, "BackoffLimitExceeded",
|
||||||
},
|
},
|
||||||
"too many job failures with podRunning - single pod": {
|
"too many job failures with podRunning - single pod": {
|
||||||
1, 5, 2, false,
|
1, 5, 2,
|
||||||
false, []int32{2}, v1.PodRunning,
|
false, []int32{2}, v1.PodRunning,
|
||||||
0, 0, 1, &jobConditionFailed, "BackoffLimitExceeded",
|
0, 0, 1, &jobConditionFailed, "BackoffLimitExceeded",
|
||||||
},
|
},
|
||||||
"too many job failures with podPending - single pod": {
|
"too many job failures with podPending - single pod": {
|
||||||
1, 5, 2, false,
|
1, 5, 2,
|
||||||
false, []int32{2}, v1.PodPending,
|
false, []int32{2}, v1.PodPending,
|
||||||
0, 0, 1, &jobConditionFailed, "BackoffLimitExceeded",
|
0, 0, 1, &jobConditionFailed, "BackoffLimitExceeded",
|
||||||
},
|
},
|
||||||
"too many job failures with podRunning - multiple pods": {
|
"too many job failures with podRunning - multiple pods": {
|
||||||
2, 5, 2, false,
|
2, 5, 2,
|
||||||
false, []int32{1, 1}, v1.PodRunning,
|
false, []int32{1, 1}, v1.PodRunning,
|
||||||
0, 0, 2, &jobConditionFailed, "BackoffLimitExceeded",
|
0, 0, 2, &jobConditionFailed, "BackoffLimitExceeded",
|
||||||
},
|
},
|
||||||
"too many job failures with podPending - multiple pods": {
|
"too many job failures with podPending - multiple pods": {
|
||||||
2, 5, 2, false,
|
2, 5, 2,
|
||||||
false, []int32{1, 1}, v1.PodPending,
|
false, []int32{1, 1}, v1.PodPending,
|
||||||
0, 0, 2, &jobConditionFailed, "BackoffLimitExceeded",
|
0, 0, 2, &jobConditionFailed, "BackoffLimitExceeded",
|
||||||
},
|
},
|
||||||
"not enough failures": {
|
"not enough failures": {
|
||||||
2, 5, 3, false,
|
2, 5, 3,
|
||||||
false, []int32{1, 1}, v1.PodRunning,
|
false, []int32{1, 1}, v1.PodRunning,
|
||||||
2, 0, 0, nil, "",
|
2, 0, 0, nil, "",
|
||||||
},
|
},
|
||||||
"suspending a job": {
|
"suspending a job": {
|
||||||
2, 4, 6, true,
|
2, 4, 6,
|
||||||
true, []int32{1, 1}, v1.PodRunning,
|
true, []int32{1, 1}, v1.PodRunning,
|
||||||
0, 0, 0, &jobConditionSuspended, "JobSuspended",
|
0, 0, 0, &jobConditionSuspended, "JobSuspended",
|
||||||
},
|
},
|
||||||
"finshed job": {
|
"finshed job": {
|
||||||
2, 4, 6, true,
|
2, 4, 6,
|
||||||
true, []int32{1, 1, 2, 0}, v1.PodSucceeded,
|
true, []int32{1, 1, 2, 0}, v1.PodSucceeded,
|
||||||
0, 4, 0, &jobConditionComplete, "",
|
0, 4, 0, &jobConditionComplete, "",
|
||||||
},
|
},
|
||||||
@ -4200,8 +4147,6 @@ func TestJobBackoffOnRestartPolicyNever(t *testing.T) {
|
|||||||
failedPods int
|
failedPods int
|
||||||
|
|
||||||
// expectations
|
// expectations
|
||||||
isExpectingAnError bool
|
|
||||||
jobKeyForget bool
|
|
||||||
expectedActive int32
|
expectedActive int32
|
||||||
expectedSucceeded int32
|
expectedSucceeded int32
|
||||||
expectedFailed int32
|
expectedFailed int32
|
||||||
@ -4211,27 +4156,27 @@ func TestJobBackoffOnRestartPolicyNever(t *testing.T) {
|
|||||||
"not enough failures with backoffLimit 0 - single pod": {
|
"not enough failures with backoffLimit 0 - single pod": {
|
||||||
1, 1, 0,
|
1, 1, 0,
|
||||||
v1.PodRunning, 1, 0,
|
v1.PodRunning, 1, 0,
|
||||||
false, false, 1, 0, 0, nil, "",
|
1, 0, 0, nil, "",
|
||||||
},
|
},
|
||||||
"not enough failures with backoffLimit 1 - single pod": {
|
"not enough failures with backoffLimit 1 - single pod": {
|
||||||
1, 1, 1,
|
1, 1, 1,
|
||||||
"", 0, 1,
|
"", 0, 1,
|
||||||
true, false, 1, 0, 1, nil, "",
|
1, 0, 1, nil, "",
|
||||||
},
|
},
|
||||||
"too many failures with backoffLimit 1 - single pod": {
|
"too many failures with backoffLimit 1 - single pod": {
|
||||||
1, 1, 1,
|
1, 1, 1,
|
||||||
"", 0, 2,
|
"", 0, 2,
|
||||||
false, false, 0, 0, 2, &jobConditionFailed, "BackoffLimitExceeded",
|
0, 0, 2, &jobConditionFailed, "BackoffLimitExceeded",
|
||||||
},
|
},
|
||||||
"not enough failures with backoffLimit 6 - multiple pods": {
|
"not enough failures with backoffLimit 6 - multiple pods": {
|
||||||
2, 2, 6,
|
2, 2, 6,
|
||||||
v1.PodRunning, 1, 6,
|
v1.PodRunning, 1, 6,
|
||||||
true, false, 2, 0, 6, nil, "",
|
2, 0, 6, nil, "",
|
||||||
},
|
},
|
||||||
"too many failures with backoffLimit 6 - multiple pods": {
|
"too many failures with backoffLimit 6 - multiple pods": {
|
||||||
2, 2, 6,
|
2, 2, 6,
|
||||||
"", 0, 7,
|
"", 0, 7,
|
||||||
false, false, 0, 0, 7, &jobConditionFailed, "BackoffLimitExceeded",
|
0, 0, 7, &jobConditionFailed, "BackoffLimitExceeded",
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -4267,9 +4212,8 @@ func TestJobBackoffOnRestartPolicyNever(t *testing.T) {
|
|||||||
|
|
||||||
// run
|
// run
|
||||||
err := manager.syncJob(context.TODO(), testutil.GetKey(job, t))
|
err := manager.syncJob(context.TODO(), testutil.GetKey(job, t))
|
||||||
|
if err != nil {
|
||||||
if (err != nil) != tc.isExpectingAnError {
|
t.Fatalf("unexpected error syncing job: %#v\n", err)
|
||||||
t.Errorf("unexpected error syncing job. Got %#v, isExpectingAnError: %v\n", err, tc.isExpectingAnError)
|
|
||||||
}
|
}
|
||||||
// validate status
|
// validate status
|
||||||
if actual.Status.Active != tc.expectedActive {
|
if actual.Status.Active != tc.expectedActive {
|
||||||
@ -4490,23 +4434,6 @@ func checkJobCompletionEnvVariable(t *testing.T, spec *v1.PodSpec) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// hasValidFailingPods checks if there exists failed pods with valid index.
|
|
||||||
func hasValidFailingPods(status []indexPhase, completions int) bool {
|
|
||||||
for _, s := range status {
|
|
||||||
ix, err := strconv.Atoi(s.Index)
|
|
||||||
if err != nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if ix < 0 || ix >= completions {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if s.Phase == v1.PodFailed {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
type podBuilder struct {
|
type podBuilder struct {
|
||||||
*v1.Pod
|
*v1.Pod
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user