mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 11:50:44 +00:00
Merge pull request #56702 from soltysh/backoff_limit_reset
Automatic merge from submit-queue (batch tested with PRs 54902, 56831, 56702, 56287, 56878). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>. Add job controller test verifying if backoff is reseted on success This adds a test for our job controller proving that the backoff limit is being reseted after a successful run. Fixes https://github.com/kubernetes/kubernetes/issues/54904 ```release-note NONE ```
This commit is contained in:
commit
bf34ba3cb9
@ -102,24 +102,43 @@ func newJobControllerFromClient(kubeClient clientset.Interface, resyncPeriod con
|
|||||||
return jm, sharedInformers
|
return jm, sharedInformers
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func newPod(name string, job *batch.Job) *v1.Pod {
|
||||||
|
return &v1.Pod{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Name: name,
|
||||||
|
Labels: job.Spec.Selector.MatchLabels,
|
||||||
|
Namespace: job.Namespace,
|
||||||
|
OwnerReferences: []metav1.OwnerReference{*metav1.NewControllerRef(job, controllerKind)},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// create count pods with the given phase for the given job
|
// create count pods with the given phase for the given job
|
||||||
func newPodList(count int32, status v1.PodPhase, job *batch.Job) []v1.Pod {
|
func newPodList(count int32, status v1.PodPhase, job *batch.Job) []v1.Pod {
|
||||||
pods := []v1.Pod{}
|
pods := []v1.Pod{}
|
||||||
for i := int32(0); i < count; i++ {
|
for i := int32(0); i < count; i++ {
|
||||||
newPod := v1.Pod{
|
newPod := newPod(fmt.Sprintf("pod-%v", rand.String(10)), job)
|
||||||
ObjectMeta: metav1.ObjectMeta{
|
newPod.Status = v1.PodStatus{Phase: status}
|
||||||
Name: fmt.Sprintf("pod-%v", rand.String(10)),
|
pods = append(pods, *newPod)
|
||||||
Labels: job.Spec.Selector.MatchLabels,
|
|
||||||
Namespace: job.Namespace,
|
|
||||||
OwnerReferences: []metav1.OwnerReference{*metav1.NewControllerRef(job, controllerKind)},
|
|
||||||
},
|
|
||||||
Status: v1.PodStatus{Phase: status},
|
|
||||||
}
|
|
||||||
pods = append(pods, newPod)
|
|
||||||
}
|
}
|
||||||
return pods
|
return pods
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func setPodsStatuses(podIndexer cache.Indexer, job *batch.Job, pendingPods, activePods, succeededPods, failedPods int32) {
|
||||||
|
for _, pod := range newPodList(pendingPods, v1.PodPending, job) {
|
||||||
|
podIndexer.Add(&pod)
|
||||||
|
}
|
||||||
|
for _, pod := range newPodList(activePods, v1.PodRunning, job) {
|
||||||
|
podIndexer.Add(&pod)
|
||||||
|
}
|
||||||
|
for _, pod := range newPodList(succeededPods, v1.PodSucceeded, job) {
|
||||||
|
podIndexer.Add(&pod)
|
||||||
|
}
|
||||||
|
for _, pod := range newPodList(failedPods, v1.PodFailed, job) {
|
||||||
|
podIndexer.Add(&pod)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestControllerSyncJob(t *testing.T) {
|
func TestControllerSyncJob(t *testing.T) {
|
||||||
jobConditionComplete := batch.JobComplete
|
jobConditionComplete := batch.JobComplete
|
||||||
jobConditionFailed := batch.JobFailed
|
jobConditionFailed := batch.JobFailed
|
||||||
@ -273,18 +292,7 @@ func TestControllerSyncJob(t *testing.T) {
|
|||||||
}
|
}
|
||||||
sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job)
|
sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job)
|
||||||
podIndexer := sharedInformerFactory.Core().V1().Pods().Informer().GetIndexer()
|
podIndexer := sharedInformerFactory.Core().V1().Pods().Informer().GetIndexer()
|
||||||
for _, pod := range newPodList(tc.pendingPods, v1.PodPending, job) {
|
setPodsStatuses(podIndexer, job, tc.pendingPods, tc.activePods, tc.succeededPods, tc.failedPods)
|
||||||
podIndexer.Add(&pod)
|
|
||||||
}
|
|
||||||
for _, pod := range newPodList(tc.activePods, v1.PodRunning, job) {
|
|
||||||
podIndexer.Add(&pod)
|
|
||||||
}
|
|
||||||
for _, pod := range newPodList(tc.succeededPods, v1.PodSucceeded, job) {
|
|
||||||
podIndexer.Add(&pod)
|
|
||||||
}
|
|
||||||
for _, pod := range newPodList(tc.failedPods, v1.PodFailed, job) {
|
|
||||||
podIndexer.Add(&pod)
|
|
||||||
}
|
|
||||||
|
|
||||||
// run
|
// run
|
||||||
forget, err := manager.syncJob(getKey(job, t))
|
forget, err := manager.syncJob(getKey(job, t))
|
||||||
@ -424,15 +432,7 @@ func TestSyncJobPastDeadline(t *testing.T) {
|
|||||||
job.Status.StartTime = &start
|
job.Status.StartTime = &start
|
||||||
sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job)
|
sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job)
|
||||||
podIndexer := sharedInformerFactory.Core().V1().Pods().Informer().GetIndexer()
|
podIndexer := sharedInformerFactory.Core().V1().Pods().Informer().GetIndexer()
|
||||||
for _, pod := range newPodList(tc.activePods, v1.PodRunning, job) {
|
setPodsStatuses(podIndexer, job, 0, tc.activePods, tc.succeededPods, tc.failedPods)
|
||||||
podIndexer.Add(&pod)
|
|
||||||
}
|
|
||||||
for _, pod := range newPodList(tc.succeededPods, v1.PodSucceeded, job) {
|
|
||||||
podIndexer.Add(&pod)
|
|
||||||
}
|
|
||||||
for _, pod := range newPodList(tc.failedPods, v1.PodFailed, job) {
|
|
||||||
podIndexer.Add(&pod)
|
|
||||||
}
|
|
||||||
|
|
||||||
// run
|
// run
|
||||||
forget, err := manager.syncJob(getKey(job, t))
|
forget, err := manager.syncJob(getKey(job, t))
|
||||||
@ -680,17 +680,6 @@ func TestJobPodLookup(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func newPod(name string, job *batch.Job) *v1.Pod {
|
|
||||||
return &v1.Pod{
|
|
||||||
ObjectMeta: metav1.ObjectMeta{
|
|
||||||
Name: name,
|
|
||||||
Labels: job.Spec.Selector.MatchLabels,
|
|
||||||
Namespace: job.Namespace,
|
|
||||||
OwnerReferences: []metav1.OwnerReference{*metav1.NewControllerRef(job, controllerKind)},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestGetPodsForJob(t *testing.T) {
|
func TestGetPodsForJob(t *testing.T) {
|
||||||
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &legacyscheme.Registry.GroupOrDie(v1.GroupName).GroupVersion}})
|
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &legacyscheme.Registry.GroupOrDie(v1.GroupName).GroupVersion}})
|
||||||
jm, informer := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
|
jm, informer := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
|
||||||
@ -1269,3 +1258,78 @@ func bumpResourceVersion(obj metav1.Object) {
|
|||||||
ver, _ := strconv.ParseInt(obj.GetResourceVersion(), 10, 32)
|
ver, _ := strconv.ParseInt(obj.GetResourceVersion(), 10, 32)
|
||||||
obj.SetResourceVersion(strconv.FormatInt(ver+1, 10))
|
obj.SetResourceVersion(strconv.FormatInt(ver+1, 10))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type pods struct {
|
||||||
|
pending int32
|
||||||
|
active int32
|
||||||
|
succeed int32
|
||||||
|
failed int32
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestJobBackoffReset(t *testing.T) {
|
||||||
|
testCases := map[string]struct {
|
||||||
|
// job setup
|
||||||
|
parallelism int32
|
||||||
|
completions int32
|
||||||
|
backoffLimit int32
|
||||||
|
|
||||||
|
// pod setup - each row is additive!
|
||||||
|
pods []pods
|
||||||
|
}{
|
||||||
|
"parallelism=1": {
|
||||||
|
1, 2, 1,
|
||||||
|
[]pods{
|
||||||
|
{0, 1, 0, 1},
|
||||||
|
{0, 0, 1, 0},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"parallelism=2 (just failure)": {
|
||||||
|
2, 2, 1,
|
||||||
|
[]pods{
|
||||||
|
{0, 2, 0, 1},
|
||||||
|
{0, 0, 1, 0},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for name, tc := range testCases {
|
||||||
|
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &legacyscheme.Registry.GroupOrDie(v1.GroupName).GroupVersion}})
|
||||||
|
DefaultJobBackOff = time.Duration(0) // overwrite the default value for testing
|
||||||
|
manager, sharedInformerFactory := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
|
||||||
|
fakePodControl := controller.FakePodControl{}
|
||||||
|
manager.podControl = &fakePodControl
|
||||||
|
manager.podStoreSynced = alwaysReady
|
||||||
|
manager.jobStoreSynced = alwaysReady
|
||||||
|
var actual *batch.Job
|
||||||
|
manager.updateHandler = func(job *batch.Job) error {
|
||||||
|
actual = job
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// job & pods setup
|
||||||
|
job := newJob(tc.parallelism, tc.completions, tc.backoffLimit)
|
||||||
|
key := getKey(job, t)
|
||||||
|
sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job)
|
||||||
|
podIndexer := sharedInformerFactory.Core().V1().Pods().Informer().GetIndexer()
|
||||||
|
|
||||||
|
setPodsStatuses(podIndexer, job, tc.pods[0].pending, tc.pods[0].active, tc.pods[0].succeed, tc.pods[0].failed)
|
||||||
|
manager.queue.Add(key)
|
||||||
|
manager.processNextWorkItem()
|
||||||
|
retries := manager.queue.NumRequeues(key)
|
||||||
|
if retries != 1 {
|
||||||
|
t.Errorf("%s: expected exactly 1 retry, got %d", name, retries)
|
||||||
|
}
|
||||||
|
|
||||||
|
job = actual
|
||||||
|
sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Replace([]interface{}{actual}, actual.ResourceVersion)
|
||||||
|
setPodsStatuses(podIndexer, job, tc.pods[1].pending, tc.pods[1].active, tc.pods[1].succeed, tc.pods[1].failed)
|
||||||
|
manager.processNextWorkItem()
|
||||||
|
retries = manager.queue.NumRequeues(key)
|
||||||
|
if retries != 0 {
|
||||||
|
t.Errorf("%s: expected exactly 0 retries, got %d", name, retries)
|
||||||
|
}
|
||||||
|
if getCondition(actual, batch.JobFailed, "BackoffLimitExceeded") {
|
||||||
|
t.Errorf("%s: unexpected job failure", name)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -77,12 +77,15 @@ var _ = SIGDescribe("Job", func() {
|
|||||||
// Worst case analysis: 15 failures, each taking 1 minute to
|
// Worst case analysis: 15 failures, each taking 1 minute to
|
||||||
// run due to some slowness, 1 in 2^15 chance of happening,
|
// run due to some slowness, 1 in 2^15 chance of happening,
|
||||||
// causing test flake. Should be very rare.
|
// causing test flake. Should be very rare.
|
||||||
job := framework.NewTestJob("randomlySucceedOrFail", "rand-non-local", v1.RestartPolicyNever, parallelism, completions, nil, 999)
|
// With the introduction of backoff limit and high failure rate this
|
||||||
|
// is hitting its timeout, the 3 is a reasonable that should make this
|
||||||
|
// test less flaky, for now.
|
||||||
|
job := framework.NewTestJob("randomlySucceedOrFail", "rand-non-local", v1.RestartPolicyNever, parallelism, 3, nil, 999)
|
||||||
job, err := framework.CreateJob(f.ClientSet, f.Namespace.Name, job)
|
job, err := framework.CreateJob(f.ClientSet, f.Namespace.Name, job)
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
By("Ensuring job reaches completions")
|
By("Ensuring job reaches completions")
|
||||||
err = framework.WaitForJobFinish(f.ClientSet, f.Namespace.Name, job.Name, completions)
|
err = framework.WaitForJobFinish(f.ClientSet, f.Namespace.Name, job.Name, *job.Spec.Completions)
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
})
|
})
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user