diff --git a/pkg/controller/job/controller.go b/pkg/controller/job/controller.go index 17e65367438..6ded634d7da 100644 --- a/pkg/controller/job/controller.go +++ b/pkg/controller/job/controller.go @@ -153,7 +153,10 @@ func (jm *JobController) getPodJob(pod *api.Pod) *experimental.Job { glog.V(4).Infof("No jobs found for pod %v, job controller will avoid syncing", pod.Name) return nil } - // TODO: add sorting and rethink the overlaping controllers, internally and with RCs + if len(jobs) > 1 { + glog.Errorf("user error! more than one job is selecting pods with labels: %+v", pod.Labels) + sort.Sort(byCreationTimestamp(jobs)) + } return &jobs[0] } @@ -286,12 +289,12 @@ func (jm *JobController) syncJob(key string) error { obj, exists, err := jm.jobStore.Store.GetByKey(key) if !exists { - glog.Infof("Job has been deleted %v", key) + glog.V(4).Infof("Job has been deleted: %v", key) jm.expectations.DeleteExpectations(key) return nil } if err != nil { - glog.Infof("Unable to retrieve job %v from store: %v", key, err) + glog.Errorf("Unable to retrieve job %v from store: %v", key, err) jm.queue.Add(key) return err } @@ -299,7 +302,7 @@ func (jm *JobController) syncJob(key string) error { if !jm.podStoreSynced() { // Sleep so we give the pod reflector goroutine a chance to run. time.Sleep(replicationcontroller.PodStoreSyncedPollPeriod) - glog.Infof("Waiting for pods controller to sync, requeuing job %v", job.Name) + glog.V(4).Infof("Waiting for pods controller to sync, requeuing job %v", job.Name) jm.enqueueController(&job) return nil } @@ -338,7 +341,7 @@ func (jm *JobController) syncJob(key string) error { job.Status.Unsuccessful = unsuccessful if err := jm.updateHandler(&job); err != nil { - glog.V(2).Infof("Failed to update job %v, requeuing", job.Name) + glog.Errorf("Failed to update job %v, requeuing", job.Name) jm.enqueueController(&job) } } @@ -361,6 +364,7 @@ func getStatus(pods []api.Pod) (successful, unsuccessful int) { } func (jm *JobController) manageJob(activePods []*api.Pod, successful, unsuccessful int, job *experimental.Job) int { + var activeLock sync.Mutex active := len(activePods) parallelism := *job.Spec.Parallelism jobKey, err := controller.KeyFunc(job) @@ -372,7 +376,7 @@ func (jm *JobController) manageJob(activePods []*api.Pod, successful, unsuccessf if active > parallelism { diff := active - parallelism jm.expectations.ExpectDeletions(jobKey, diff) - glog.V(2).Infof("Too many pods running job %q, need %d, deleting %d", jobKey, parallelism, diff) + glog.V(4).Infof("Too many pods running job %q, need %d, deleting %d", jobKey, parallelism, diff) // Sort the pods in the order such that not-ready < ready, unscheduled // < scheduled, and pending < running. This ensures that we delete pods // in the earlier stages whenever possible. @@ -385,11 +389,12 @@ func (jm *JobController) manageJob(activePods []*api.Pod, successful, unsuccessf go func(ix int) { defer wait.Done() if err := jm.podControl.DeletePod(job.Namespace, activePods[ix].Name); err != nil { + defer util.HandleError(err) // Decrement the expected number of deletes because the informer won't observe this deletion - glog.V(2).Infof("Failed deletion, decrementing expectations for controller %q", jobKey) jm.expectations.DeletionObserved(jobKey) - util.HandleError(err) + activeLock.Lock() active++ + activeLock.Unlock() } }(i) } @@ -404,7 +409,7 @@ func (jm *JobController) manageJob(activePods []*api.Pod, successful, unsuccessf } diff -= active jm.expectations.ExpectCreations(jobKey, diff) - glog.V(2).Infof("Too few pods running job %q, need %d, creating %d", jobKey, parallelism, diff) + glog.V(4).Infof("Too few pods running job %q, need %d, creating %d", jobKey, parallelism, diff) active += diff wait := sync.WaitGroup{} @@ -413,11 +418,12 @@ func (jm *JobController) manageJob(activePods []*api.Pod, successful, unsuccessf go func() { defer wait.Done() if err := jm.podControl.CreatePods(job.Namespace, job.Spec.Template, job); err != nil { + defer util.HandleError(err) // Decrement the expected number of creates because the informer won't observe this pod - glog.V(2).Infof("Failed creation, decrementing expectations for controller %q", jobKey) jm.expectations.CreationObserved(jobKey) - util.HandleError(err) + activeLock.Lock() active-- + activeLock.Unlock() } }() } @@ -442,3 +448,16 @@ func filterPods(pods []api.Pod, phase api.PodPhase) int { } return result } + +// byCreationTimestamp sorts a list by creation timestamp, using their names as a tie breaker. +type byCreationTimestamp []experimental.Job + +func (o byCreationTimestamp) Len() int { return len(o) } +func (o byCreationTimestamp) Swap(i, j int) { o[i], o[j] = o[j], o[i] } + +func (o byCreationTimestamp) Less(i, j int) bool { + if o[i].CreationTimestamp.Equal(o[j].CreationTimestamp) { + return o[i].Name < o[j].Name + } + return o[i].CreationTimestamp.Before(o[j].CreationTimestamp) +}