diff --git a/pkg/controller/endpoint/endpoints_controller.go b/pkg/controller/endpoint/endpoints_controller.go index ba8d4500fbb..21c7fc8257c 100644 --- a/pkg/controller/endpoint/endpoints_controller.go +++ b/pkg/controller/endpoint/endpoints_controller.go @@ -19,14 +19,13 @@ limitations under the License. package endpoint import ( + "fmt" "reflect" "strconv" "time" "encoding/json" - "fmt" - "github.com/golang/glog" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/endpoints" "k8s.io/kubernetes/pkg/api/errors" @@ -44,6 +43,8 @@ import ( "k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/util/workqueue" "k8s.io/kubernetes/pkg/watch" + + "github.com/golang/glog" ) const ( diff --git a/pkg/controller/job/jobcontroller.go b/pkg/controller/job/jobcontroller.go index 27c6c2179e5..68c43b1611b 100644 --- a/pkg/controller/job/jobcontroller.go +++ b/pkg/controller/job/jobcontroller.go @@ -17,12 +17,12 @@ limitations under the License. package job import ( + "fmt" "reflect" "sort" "sync" "time" - "github.com/golang/glog" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/apis/batch" @@ -39,6 +39,8 @@ import ( "k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/util/workqueue" "k8s.io/kubernetes/pkg/watch" + + "github.com/golang/glog" ) type JobController struct { @@ -71,7 +73,7 @@ type JobController struct { podStore cache.StoreToPodLister // Jobs that need to be updated - queue *workqueue.Type + queue workqueue.RateLimitingInterface recorder record.EventRecorder } @@ -93,7 +95,7 @@ func NewJobController(podInformer cache.SharedIndexInformer, kubeClient clientse Recorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "job-controller"}), }, expectations: controller.NewControllerExpectations(), - queue: workqueue.NewNamed("job"), + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "job"), recorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "job-controller"}), } @@ -144,6 +146,12 @@ func NewJobControllerFromClient(kubeClient clientset.Interface, resyncPeriod con // Run the main goroutine responsible for watching and syncing jobs. func (jm *JobController) Run(workers int, stopCh <-chan struct{}) { defer utilruntime.HandleCrash() + defer jm.queue.ShutDown() + + if !cache.WaitForCacheSync(stopCh, jm.podStoreSynced) { + return + } + go jm.jobController.Run(stopCh) for i := 0; i < workers; i++ { go wait.Until(jm.worker, time.Second, stopCh) @@ -155,7 +163,6 @@ func (jm *JobController) Run(workers int, stopCh <-chan struct{}) { <-stopCh glog.Infof("Shutting down Job Manager") - jm.queue.ShutDown() } // getPodJob returns the job managing the given pod. @@ -166,7 +173,7 @@ func (jm *JobController) getPodJob(pod *api.Pod) *batch.Job { return nil } if len(jobs) > 1 { - glog.Errorf("user error! more than one job is selecting pods with labels: %+v", pod.Labels) + utilruntime.HandleError(fmt.Errorf("user error! more than one job is selecting pods with labels: %+v", pod.Labels)) sort.Sort(byCreationTimestamp(jobs)) } return &jobs[0] @@ -184,7 +191,7 @@ func (jm *JobController) addPod(obj interface{}) { if job := jm.getPodJob(pod); job != nil { jobKey, err := controller.KeyFunc(job) if err != nil { - glog.Errorf("Couldn't get key for job %#v: %v", job, err) + utilruntime.HandleError(fmt.Errorf("Couldn't get key for job %#v: %v", job, err)) return } jm.expectations.CreationObserved(jobKey) @@ -236,19 +243,19 @@ func (jm *JobController) deletePod(obj interface{}) { if !ok { tombstone, ok := obj.(cache.DeletedFinalStateUnknown) if !ok { - glog.Errorf("Couldn't get object from tombstone %+v", obj) + utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %+v", obj)) return } pod, ok = tombstone.Obj.(*api.Pod) if !ok { - glog.Errorf("Tombstone contained object that is not a pod %+v", obj) + utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a pod %+v", obj)) return } } if job := jm.getPodJob(pod); job != nil { jobKey, err := controller.KeyFunc(job) if err != nil { - glog.Errorf("Couldn't get key for job %#v: %v", job, err) + utilruntime.HandleError(fmt.Errorf("Couldn't get key for job %#v: %v", job, err)) return } jm.expectations.DeletionObserved(jobKey) @@ -260,7 +267,7 @@ func (jm *JobController) deletePod(obj interface{}) { func (jm *JobController) enqueueController(obj interface{}) { key, err := controller.KeyFunc(obj) if err != nil { - glog.Errorf("Couldn't get key for object %+v: %v", obj, err) + utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err)) return } @@ -276,21 +283,29 @@ func (jm *JobController) enqueueController(obj interface{}) { // worker runs a worker thread that just dequeues items, processes them, and marks them done. // It enforces that the syncHandler is never invoked concurrently with the same key. func (jm *JobController) worker() { - for { - func() { - key, quit := jm.queue.Get() - if quit { - return - } - defer jm.queue.Done(key) - err := jm.syncHandler(key.(string)) - if err != nil { - glog.Errorf("Error syncing job: %v", err) - } - }() + for jm.processNextWorkItem() { } } +func (jm *JobController) processNextWorkItem() bool { + key, quit := jm.queue.Get() + if quit { + return false + } + defer jm.queue.Done(key) + + err := jm.syncHandler(key.(string)) + if err == nil { + jm.queue.Forget(key) + return true + } + + utilruntime.HandleError(fmt.Errorf("Error syncing job: %v", err)) + jm.queue.AddRateLimited(key) + + return true +} + // syncJob will sync the job with the given key if it has had its expectations fulfilled, meaning // it did not expect to see any more of its pods created or deleted. This function is not meant to be invoked // concurrently with the same key. @@ -300,14 +315,6 @@ func (jm *JobController) syncJob(key string) error { glog.V(4).Infof("Finished syncing job %q (%v)", key, time.Now().Sub(startTime)) }() - if !jm.podStoreSynced() { - // Sleep so we give the pod reflector goroutine a chance to run. - time.Sleep(replicationcontroller.PodStoreSyncedPollPeriod) - glog.V(4).Infof("Waiting for pods controller to sync, requeuing job %v", key) - jm.queue.Add(key) - return nil - } - obj, exists, err := jm.jobStore.Store.GetByKey(key) if !exists { glog.V(4).Infof("Job has been deleted: %v", key) @@ -315,8 +322,6 @@ func (jm *JobController) syncJob(key string) error { return nil } if err != nil { - glog.Errorf("Unable to retrieve job %v from store: %v", key, err) - jm.queue.Add(key) return err } job := *obj.(*batch.Job) @@ -324,17 +329,10 @@ func (jm *JobController) syncJob(key string) error { // Check the expectations of the job before counting active pods, otherwise a new pod can sneak in // and update the expectations after we've retrieved active pods from the store. If a new pod enters // the store after we've checked the expectation, the job sync is just deferred till the next relist. - jobKey, err := controller.KeyFunc(&job) - if err != nil { - glog.Errorf("Couldn't get key for job %#v: %v", job, err) - return err - } - jobNeedsSync := jm.expectations.SatisfiedExpectations(jobKey) + jobNeedsSync := jm.expectations.SatisfiedExpectations(key) selector, _ := unversioned.LabelSelectorAsSelector(job.Spec.Selector) pods, err := jm.podStore.Pods(job.Namespace).List(selector) if err != nil { - glog.Errorf("Error getting pods for job %q: %v", key, err) - jm.queue.Add(key) return err } @@ -418,8 +416,7 @@ func (jm *JobController) syncJob(key string) error { job.Status.Failed = failed if err := jm.updateHandler(&job); err != nil { - glog.Errorf("Failed to update job %v, requeuing. Error: %v", job.Name, err) - jm.enqueueController(&job) + return err } } return nil @@ -464,7 +461,7 @@ func (jm *JobController) manageJob(activePods []*api.Pod, succeeded int32, job * parallelism := *job.Spec.Parallelism jobKey, err := controller.KeyFunc(job) if err != nil { - glog.Errorf("Couldn't get key for job %#v: %v", job, err) + utilruntime.HandleError(fmt.Errorf("Couldn't get key for job %#v: %v", job, err)) return 0 } @@ -516,7 +513,7 @@ func (jm *JobController) manageJob(activePods []*api.Pod, succeeded int32, job * } diff := wantActive - active if diff < 0 { - glog.Errorf("More active than wanted: job %q, want %d, have %d", jobKey, wantActive, active) + utilruntime.HandleError(fmt.Errorf("More active than wanted: job %q, want %d, have %d", jobKey, wantActive, active)) diff = 0 } jm.expectations.ExpectCreations(jobKey, int(diff)) diff --git a/pkg/controller/job/jobcontroller_test.go b/pkg/controller/job/jobcontroller_test.go index b2a57d57601..ce36def5785 100644 --- a/pkg/controller/job/jobcontroller_test.go +++ b/pkg/controller/job/jobcontroller_test.go @@ -476,12 +476,16 @@ func TestSyncJobUpdateRequeue(t *testing.T) { fakePodControl := controller.FakePodControl{} manager.podControl = &fakePodControl manager.podStoreSynced = alwaysReady - manager.updateHandler = func(job *batch.Job) error { return fmt.Errorf("Fake error") } + updateError := fmt.Errorf("Update error") + manager.updateHandler = func(job *batch.Job) error { + manager.queue.AddRateLimited(getKey(job, t)) + return updateError + } job := newJob(2, 2) manager.jobStore.Store.Add(job) err := manager.syncJob(getKey(job, t)) - if err != nil { - t.Errorf("Unxpected error when syncing jobs, got %v", err) + if err == nil || err != updateError { + t.Errorf("Expected error %v when syncing jobs, got %v", updateError, err) } t.Log("Waiting for a job in the queue") key, _ := manager.queue.Get()