Merge pull request #60985 from soltysh/issue59918

Automatic merge from submit-queue (batch tested with PRs 60978, 60985). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

Backoff only when failed pod shows up

**What this PR does / why we need it**:
Upon introducing the backoff policy we started to delay sync runs for the job when it failed several times before. This leads to failed jobs not reporting status right away in cases that are not related to failed pods, eg. a successful run. This PR ensures the backoff is applied only when `updatePod` receives a failed pod.

**Which issue(s) this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close the issue(s) when PR gets merged)*:
Fixes #59918 #59527

/assign @janetkuo @kow3ns 

**Release note**:
```release-note
None
```
This commit is contained in:
Kubernetes Submit Queue 2018-03-15 22:55:02 -07:00 committed by GitHub
commit 5d67222592
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 94 additions and 15 deletions

View File

@ -64,6 +64,7 @@ go_test(
"//vendor/k8s.io/client-go/rest:go_default_library",
"//vendor/k8s.io/client-go/testing:go_default_library",
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
"//vendor/k8s.io/client-go/util/workqueue:go_default_library",
],
)

View File

@ -109,9 +109,13 @@ func NewJobController(podInformer coreinformers.PodInformer, jobInformer batchin
}
jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: jm.enqueueController,
AddFunc: func(obj interface{}) {
jm.enqueueController(obj, true)
},
UpdateFunc: jm.updateJob,
DeleteFunc: jm.enqueueController,
DeleteFunc: func(obj interface{}) {
jm.enqueueController(obj, true)
},
})
jm.jobLister = jobInformer.Lister()
jm.jobStoreSynced = jobInformer.Informer().HasSynced
@ -209,7 +213,7 @@ func (jm *JobController) addPod(obj interface{}) {
return
}
jm.expectations.CreationObserved(jobKey)
jm.enqueueController(job)
jm.enqueueController(job, true)
return
}
@ -218,7 +222,7 @@ func (jm *JobController) addPod(obj interface{}) {
// DO NOT observe creation because no controller should be waiting for an
// orphan.
for _, job := range jm.getPodJobs(pod) {
jm.enqueueController(job)
jm.enqueueController(job, true)
}
}
@ -242,7 +246,8 @@ func (jm *JobController) updatePod(old, cur interface{}) {
return
}
labelChanged := !reflect.DeepEqual(curPod.Labels, oldPod.Labels)
// the only time we want the backoff to kick-in, is when the pod failed
immediate := curPod.Status.Phase != v1.PodFailed
curControllerRef := metav1.GetControllerOf(curPod)
oldControllerRef := metav1.GetControllerOf(oldPod)
@ -250,7 +255,7 @@ func (jm *JobController) updatePod(old, cur interface{}) {
if controllerRefChanged && oldControllerRef != nil {
// The ControllerRef was changed. Sync the old controller, if any.
if job := jm.resolveControllerRef(oldPod.Namespace, oldControllerRef); job != nil {
jm.enqueueController(job)
jm.enqueueController(job, immediate)
}
}
@ -260,15 +265,16 @@ func (jm *JobController) updatePod(old, cur interface{}) {
if job == nil {
return
}
jm.enqueueController(job)
jm.enqueueController(job, immediate)
return
}
// Otherwise, it's an orphan. If anything changed, sync matching controllers
// to see if anyone wants to adopt it now.
labelChanged := !reflect.DeepEqual(curPod.Labels, oldPod.Labels)
if labelChanged || controllerRefChanged {
for _, job := range jm.getPodJobs(curPod) {
jm.enqueueController(job)
jm.enqueueController(job, immediate)
}
}
}
@ -309,7 +315,7 @@ func (jm *JobController) deletePod(obj interface{}) {
return
}
jm.expectations.DeletionObserved(jobKey)
jm.enqueueController(job)
jm.enqueueController(job, true)
}
func (jm *JobController) updateJob(old, cur interface{}) {
@ -321,7 +327,7 @@ func (jm *JobController) updateJob(old, cur interface{}) {
if err != nil {
return
}
jm.enqueueController(curJob)
jm.enqueueController(curJob, true)
// check if need to add a new rsync for ActiveDeadlineSeconds
if curJob.Status.StartTime != nil {
curADS := curJob.Spec.ActiveDeadlineSeconds
@ -341,15 +347,19 @@ func (jm *JobController) updateJob(old, cur interface{}) {
}
}
// obj could be an *batch.Job, or a DeletionFinalStateUnknown marker item.
func (jm *JobController) enqueueController(job interface{}) {
key, err := controller.KeyFunc(job)
// obj could be an *batch.Job, or a DeletionFinalStateUnknown marker item,
// immediate tells the controller to update the status right away, and should
// happen ONLY when there was a successful pod run.
func (jm *JobController) enqueueController(obj interface{}, immediate bool) {
key, err := controller.KeyFunc(obj)
if err != nil {
utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", job, err))
utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err))
return
}
// Retrieves the backoff duration for this Job
if immediate {
jm.queue.Forget(key)
}
backoff := getBackoff(jm.queue, key)
// TODO: Handle overlapping controllers better. Either disallow them at admission time or

View File

@ -36,6 +36,7 @@ import (
restclient "k8s.io/client-go/rest"
core "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/kubernetes/pkg/api/legacyscheme"
_ "k8s.io/kubernetes/pkg/apis/core/install"
"k8s.io/kubernetes/pkg/controller"
@ -1338,3 +1339,70 @@ func TestJobBackoffReset(t *testing.T) {
}
}
}
var _ workqueue.RateLimitingInterface = &fakeRateLimitingQueue{}
type fakeRateLimitingQueue struct {
workqueue.Interface
requeues int
item interface{}
duration time.Duration
}
func (f *fakeRateLimitingQueue) AddRateLimited(item interface{}) {}
func (f *fakeRateLimitingQueue) Forget(item interface{}) {}
func (f *fakeRateLimitingQueue) NumRequeues(item interface{}) int {
return f.requeues
}
func (f *fakeRateLimitingQueue) AddAfter(item interface{}, duration time.Duration) {
f.item = item
f.duration = duration
}
func TestJobBackoff(t *testing.T) {
job := newJob(1, 1, 1)
oldPod := newPod(fmt.Sprintf("pod-%v", rand.String(10)), job)
oldPod.Status.Phase = v1.PodRunning
oldPod.ResourceVersion = "1"
newPod := oldPod.DeepCopy()
newPod.ResourceVersion = "2"
testCases := map[string]struct {
// inputs
requeues int
phase v1.PodPhase
// expectation
backoff int
}{
"1st failure": {0, v1.PodFailed, 0},
"2nd failure": {1, v1.PodFailed, 1},
"3rd failure": {2, v1.PodFailed, 2},
"1st success": {0, v1.PodSucceeded, 0},
"2nd success": {1, v1.PodSucceeded, 0},
"1st running": {0, v1.PodSucceeded, 0},
"2nd running": {1, v1.PodSucceeded, 0},
}
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &legacyscheme.Registry.GroupOrDie(v1.GroupName).GroupVersion}})
manager, sharedInformerFactory := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
fakePodControl := controller.FakePodControl{}
manager.podControl = &fakePodControl
manager.podStoreSynced = alwaysReady
manager.jobStoreSynced = alwaysReady
queue := &fakeRateLimitingQueue{}
manager.queue = queue
sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job)
queue.requeues = tc.requeues
newPod.Status.Phase = tc.phase
manager.updatePod(oldPod, newPod)
if queue.duration.Nanoseconds() != int64(tc.backoff)*DefaultJobBackOff.Nanoseconds() {
t.Errorf("unexpected backoff %v", queue.duration)
}
})
}
}