Merge pull request #32785 from m1093782566/m109-job-controller-hot-loop

Automatic merge from submit-queue

[Controller Manager] Fix job controller hot loop

<!--  Thanks for sending a pull request!  Here are some tips for you:
1. If this is your first time, read our contributor guidelines https://github.com/kubernetes/kubernetes/blob/master/CONTRIBUTING.md and developer guide https://github.com/kubernetes/kubernetes/blob/master/docs/devel/development.md
2. If you want *faster* PR reviews, read how: https://github.com/kubernetes/kubernetes/blob/master/docs/devel/faster_reviews.md
3. Follow the instructions for writing a release note: https://github.com/kubernetes/kubernetes/blob/master/docs/devel/pull-requests.md#release-notes
-->

**What this PR does / why we need it:**

Fix Job controller hotloop on unexpected API server rejections.

**Which issue this PR fixes**

Related issue is #30629

**Special notes for your reviewer:**

@deads2k @derekwaynecarr PTAL.
This commit is contained in:
Kubernetes Submit Queue 2016-09-20 13:52:45 -07:00 committed by GitHub
commit ad7ba62b24
3 changed files with 51 additions and 49 deletions

View File

@ -19,14 +19,13 @@ limitations under the License.
package endpoint
import (
"fmt"
"reflect"
"strconv"
"time"
"encoding/json"
"fmt"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/endpoints"
"k8s.io/kubernetes/pkg/api/errors"
@ -44,6 +43,8 @@ import (
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/util/workqueue"
"k8s.io/kubernetes/pkg/watch"
"github.com/golang/glog"
)
const (

View File

@ -17,12 +17,12 @@ limitations under the License.
package job
import (
"fmt"
"reflect"
"sort"
"sync"
"time"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/apis/batch"
@ -39,6 +39,8 @@ import (
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/util/workqueue"
"k8s.io/kubernetes/pkg/watch"
"github.com/golang/glog"
)
type JobController struct {
@ -71,7 +73,7 @@ type JobController struct {
podStore cache.StoreToPodLister
// Jobs that need to be updated
queue *workqueue.Type
queue workqueue.RateLimitingInterface
recorder record.EventRecorder
}
@ -93,7 +95,7 @@ func NewJobController(podInformer cache.SharedIndexInformer, kubeClient clientse
Recorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "job-controller"}),
},
expectations: controller.NewControllerExpectations(),
queue: workqueue.NewNamed("job"),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "job"),
recorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "job-controller"}),
}
@ -144,6 +146,12 @@ func NewJobControllerFromClient(kubeClient clientset.Interface, resyncPeriod con
// Run the main goroutine responsible for watching and syncing jobs.
func (jm *JobController) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer jm.queue.ShutDown()
if !cache.WaitForCacheSync(stopCh, jm.podStoreSynced) {
return
}
go jm.jobController.Run(stopCh)
for i := 0; i < workers; i++ {
go wait.Until(jm.worker, time.Second, stopCh)
@ -155,7 +163,6 @@ func (jm *JobController) Run(workers int, stopCh <-chan struct{}) {
<-stopCh
glog.Infof("Shutting down Job Manager")
jm.queue.ShutDown()
}
// getPodJob returns the job managing the given pod.
@ -166,7 +173,7 @@ func (jm *JobController) getPodJob(pod *api.Pod) *batch.Job {
return nil
}
if len(jobs) > 1 {
glog.Errorf("user error! more than one job is selecting pods with labels: %+v", pod.Labels)
utilruntime.HandleError(fmt.Errorf("user error! more than one job is selecting pods with labels: %+v", pod.Labels))
sort.Sort(byCreationTimestamp(jobs))
}
return &jobs[0]
@ -184,7 +191,7 @@ func (jm *JobController) addPod(obj interface{}) {
if job := jm.getPodJob(pod); job != nil {
jobKey, err := controller.KeyFunc(job)
if err != nil {
glog.Errorf("Couldn't get key for job %#v: %v", job, err)
utilruntime.HandleError(fmt.Errorf("Couldn't get key for job %#v: %v", job, err))
return
}
jm.expectations.CreationObserved(jobKey)
@ -236,19 +243,19 @@ func (jm *JobController) deletePod(obj interface{}) {
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
glog.Errorf("Couldn't get object from tombstone %+v", obj)
utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %+v", obj))
return
}
pod, ok = tombstone.Obj.(*api.Pod)
if !ok {
glog.Errorf("Tombstone contained object that is not a pod %+v", obj)
utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a pod %+v", obj))
return
}
}
if job := jm.getPodJob(pod); job != nil {
jobKey, err := controller.KeyFunc(job)
if err != nil {
glog.Errorf("Couldn't get key for job %#v: %v", job, err)
utilruntime.HandleError(fmt.Errorf("Couldn't get key for job %#v: %v", job, err))
return
}
jm.expectations.DeletionObserved(jobKey)
@ -260,7 +267,7 @@ func (jm *JobController) deletePod(obj interface{}) {
func (jm *JobController) enqueueController(obj interface{}) {
key, err := controller.KeyFunc(obj)
if err != nil {
glog.Errorf("Couldn't get key for object %+v: %v", obj, err)
utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err))
return
}
@ -276,21 +283,29 @@ func (jm *JobController) enqueueController(obj interface{}) {
// worker runs a worker thread that just dequeues items, processes them, and marks them done.
// It enforces that the syncHandler is never invoked concurrently with the same key.
func (jm *JobController) worker() {
for {
func() {
key, quit := jm.queue.Get()
if quit {
return
}
defer jm.queue.Done(key)
err := jm.syncHandler(key.(string))
if err != nil {
glog.Errorf("Error syncing job: %v", err)
}
}()
for jm.processNextWorkItem() {
}
}
func (jm *JobController) processNextWorkItem() bool {
key, quit := jm.queue.Get()
if quit {
return false
}
defer jm.queue.Done(key)
err := jm.syncHandler(key.(string))
if err == nil {
jm.queue.Forget(key)
return true
}
utilruntime.HandleError(fmt.Errorf("Error syncing job: %v", err))
jm.queue.AddRateLimited(key)
return true
}
// syncJob will sync the job with the given key if it has had its expectations fulfilled, meaning
// it did not expect to see any more of its pods created or deleted. This function is not meant to be invoked
// concurrently with the same key.
@ -300,14 +315,6 @@ func (jm *JobController) syncJob(key string) error {
glog.V(4).Infof("Finished syncing job %q (%v)", key, time.Now().Sub(startTime))
}()
if !jm.podStoreSynced() {
// Sleep so we give the pod reflector goroutine a chance to run.
time.Sleep(replicationcontroller.PodStoreSyncedPollPeriod)
glog.V(4).Infof("Waiting for pods controller to sync, requeuing job %v", key)
jm.queue.Add(key)
return nil
}
obj, exists, err := jm.jobStore.Store.GetByKey(key)
if !exists {
glog.V(4).Infof("Job has been deleted: %v", key)
@ -315,8 +322,6 @@ func (jm *JobController) syncJob(key string) error {
return nil
}
if err != nil {
glog.Errorf("Unable to retrieve job %v from store: %v", key, err)
jm.queue.Add(key)
return err
}
job := *obj.(*batch.Job)
@ -324,17 +329,10 @@ func (jm *JobController) syncJob(key string) error {
// Check the expectations of the job before counting active pods, otherwise a new pod can sneak in
// and update the expectations after we've retrieved active pods from the store. If a new pod enters
// the store after we've checked the expectation, the job sync is just deferred till the next relist.
jobKey, err := controller.KeyFunc(&job)
if err != nil {
glog.Errorf("Couldn't get key for job %#v: %v", job, err)
return err
}
jobNeedsSync := jm.expectations.SatisfiedExpectations(jobKey)
jobNeedsSync := jm.expectations.SatisfiedExpectations(key)
selector, _ := unversioned.LabelSelectorAsSelector(job.Spec.Selector)
pods, err := jm.podStore.Pods(job.Namespace).List(selector)
if err != nil {
glog.Errorf("Error getting pods for job %q: %v", key, err)
jm.queue.Add(key)
return err
}
@ -418,8 +416,7 @@ func (jm *JobController) syncJob(key string) error {
job.Status.Failed = failed
if err := jm.updateHandler(&job); err != nil {
glog.Errorf("Failed to update job %v, requeuing. Error: %v", job.Name, err)
jm.enqueueController(&job)
return err
}
}
return nil
@ -464,7 +461,7 @@ func (jm *JobController) manageJob(activePods []*api.Pod, succeeded int32, job *
parallelism := *job.Spec.Parallelism
jobKey, err := controller.KeyFunc(job)
if err != nil {
glog.Errorf("Couldn't get key for job %#v: %v", job, err)
utilruntime.HandleError(fmt.Errorf("Couldn't get key for job %#v: %v", job, err))
return 0
}
@ -516,7 +513,7 @@ func (jm *JobController) manageJob(activePods []*api.Pod, succeeded int32, job *
}
diff := wantActive - active
if diff < 0 {
glog.Errorf("More active than wanted: job %q, want %d, have %d", jobKey, wantActive, active)
utilruntime.HandleError(fmt.Errorf("More active than wanted: job %q, want %d, have %d", jobKey, wantActive, active))
diff = 0
}
jm.expectations.ExpectCreations(jobKey, int(diff))

View File

@ -476,12 +476,16 @@ func TestSyncJobUpdateRequeue(t *testing.T) {
fakePodControl := controller.FakePodControl{}
manager.podControl = &fakePodControl
manager.podStoreSynced = alwaysReady
manager.updateHandler = func(job *batch.Job) error { return fmt.Errorf("Fake error") }
updateError := fmt.Errorf("Update error")
manager.updateHandler = func(job *batch.Job) error {
manager.queue.AddRateLimited(getKey(job, t))
return updateError
}
job := newJob(2, 2)
manager.jobStore.Store.Add(job)
err := manager.syncJob(getKey(job, t))
if err != nil {
t.Errorf("Unxpected error when syncing jobs, got %v", err)
if err == nil || err != updateError {
t.Errorf("Expected error %v when syncing jobs, got %v", updateError, err)
}
t.Log("Waiting for a job in the queue")
key, _ := manager.queue.Get()