mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-02 08:17:26 +00:00
Merge pull request #14196 from mikedanese/various-job
Auto commit by PR queue bot
This commit is contained in:
commit
3eabb81e5d
@ -153,7 +153,10 @@ func (jm *JobController) getPodJob(pod *api.Pod) *experimental.Job {
|
|||||||
glog.V(4).Infof("No jobs found for pod %v, job controller will avoid syncing", pod.Name)
|
glog.V(4).Infof("No jobs found for pod %v, job controller will avoid syncing", pod.Name)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
// TODO: add sorting and rethink the overlaping controllers, internally and with RCs
|
if len(jobs) > 1 {
|
||||||
|
glog.Errorf("user error! more than one job is selecting pods with labels: %+v", pod.Labels)
|
||||||
|
sort.Sort(byCreationTimestamp(jobs))
|
||||||
|
}
|
||||||
return &jobs[0]
|
return &jobs[0]
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -286,12 +289,12 @@ func (jm *JobController) syncJob(key string) error {
|
|||||||
|
|
||||||
obj, exists, err := jm.jobStore.Store.GetByKey(key)
|
obj, exists, err := jm.jobStore.Store.GetByKey(key)
|
||||||
if !exists {
|
if !exists {
|
||||||
glog.Infof("Job has been deleted %v", key)
|
glog.V(4).Infof("Job has been deleted: %v", key)
|
||||||
jm.expectations.DeleteExpectations(key)
|
jm.expectations.DeleteExpectations(key)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Infof("Unable to retrieve job %v from store: %v", key, err)
|
glog.Errorf("Unable to retrieve job %v from store: %v", key, err)
|
||||||
jm.queue.Add(key)
|
jm.queue.Add(key)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -299,7 +302,7 @@ func (jm *JobController) syncJob(key string) error {
|
|||||||
if !jm.podStoreSynced() {
|
if !jm.podStoreSynced() {
|
||||||
// Sleep so we give the pod reflector goroutine a chance to run.
|
// Sleep so we give the pod reflector goroutine a chance to run.
|
||||||
time.Sleep(replicationcontroller.PodStoreSyncedPollPeriod)
|
time.Sleep(replicationcontroller.PodStoreSyncedPollPeriod)
|
||||||
glog.Infof("Waiting for pods controller to sync, requeuing job %v", job.Name)
|
glog.V(4).Infof("Waiting for pods controller to sync, requeuing job %v", job.Name)
|
||||||
jm.enqueueController(&job)
|
jm.enqueueController(&job)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -338,7 +341,7 @@ func (jm *JobController) syncJob(key string) error {
|
|||||||
job.Status.Unsuccessful = unsuccessful
|
job.Status.Unsuccessful = unsuccessful
|
||||||
|
|
||||||
if err := jm.updateHandler(&job); err != nil {
|
if err := jm.updateHandler(&job); err != nil {
|
||||||
glog.V(2).Infof("Failed to update job %v, requeuing", job.Name)
|
glog.Errorf("Failed to update job %v, requeuing", job.Name)
|
||||||
jm.enqueueController(&job)
|
jm.enqueueController(&job)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -361,6 +364,7 @@ func getStatus(pods []api.Pod) (successful, unsuccessful int) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (jm *JobController) manageJob(activePods []*api.Pod, successful, unsuccessful int, job *experimental.Job) int {
|
func (jm *JobController) manageJob(activePods []*api.Pod, successful, unsuccessful int, job *experimental.Job) int {
|
||||||
|
var activeLock sync.Mutex
|
||||||
active := len(activePods)
|
active := len(activePods)
|
||||||
parallelism := *job.Spec.Parallelism
|
parallelism := *job.Spec.Parallelism
|
||||||
jobKey, err := controller.KeyFunc(job)
|
jobKey, err := controller.KeyFunc(job)
|
||||||
@ -372,7 +376,7 @@ func (jm *JobController) manageJob(activePods []*api.Pod, successful, unsuccessf
|
|||||||
if active > parallelism {
|
if active > parallelism {
|
||||||
diff := active - parallelism
|
diff := active - parallelism
|
||||||
jm.expectations.ExpectDeletions(jobKey, diff)
|
jm.expectations.ExpectDeletions(jobKey, diff)
|
||||||
glog.V(2).Infof("Too many pods running job %q, need %d, deleting %d", jobKey, parallelism, diff)
|
glog.V(4).Infof("Too many pods running job %q, need %d, deleting %d", jobKey, parallelism, diff)
|
||||||
// Sort the pods in the order such that not-ready < ready, unscheduled
|
// Sort the pods in the order such that not-ready < ready, unscheduled
|
||||||
// < scheduled, and pending < running. This ensures that we delete pods
|
// < scheduled, and pending < running. This ensures that we delete pods
|
||||||
// in the earlier stages whenever possible.
|
// in the earlier stages whenever possible.
|
||||||
@ -385,11 +389,12 @@ func (jm *JobController) manageJob(activePods []*api.Pod, successful, unsuccessf
|
|||||||
go func(ix int) {
|
go func(ix int) {
|
||||||
defer wait.Done()
|
defer wait.Done()
|
||||||
if err := jm.podControl.DeletePod(job.Namespace, activePods[ix].Name); err != nil {
|
if err := jm.podControl.DeletePod(job.Namespace, activePods[ix].Name); err != nil {
|
||||||
|
defer util.HandleError(err)
|
||||||
// Decrement the expected number of deletes because the informer won't observe this deletion
|
// Decrement the expected number of deletes because the informer won't observe this deletion
|
||||||
glog.V(2).Infof("Failed deletion, decrementing expectations for controller %q", jobKey)
|
|
||||||
jm.expectations.DeletionObserved(jobKey)
|
jm.expectations.DeletionObserved(jobKey)
|
||||||
util.HandleError(err)
|
activeLock.Lock()
|
||||||
active++
|
active++
|
||||||
|
activeLock.Unlock()
|
||||||
}
|
}
|
||||||
}(i)
|
}(i)
|
||||||
}
|
}
|
||||||
@ -404,7 +409,7 @@ func (jm *JobController) manageJob(activePods []*api.Pod, successful, unsuccessf
|
|||||||
}
|
}
|
||||||
diff -= active
|
diff -= active
|
||||||
jm.expectations.ExpectCreations(jobKey, diff)
|
jm.expectations.ExpectCreations(jobKey, diff)
|
||||||
glog.V(2).Infof("Too few pods running job %q, need %d, creating %d", jobKey, parallelism, diff)
|
glog.V(4).Infof("Too few pods running job %q, need %d, creating %d", jobKey, parallelism, diff)
|
||||||
|
|
||||||
active += diff
|
active += diff
|
||||||
wait := sync.WaitGroup{}
|
wait := sync.WaitGroup{}
|
||||||
@ -413,11 +418,12 @@ func (jm *JobController) manageJob(activePods []*api.Pod, successful, unsuccessf
|
|||||||
go func() {
|
go func() {
|
||||||
defer wait.Done()
|
defer wait.Done()
|
||||||
if err := jm.podControl.CreatePods(job.Namespace, job.Spec.Template, job); err != nil {
|
if err := jm.podControl.CreatePods(job.Namespace, job.Spec.Template, job); err != nil {
|
||||||
|
defer util.HandleError(err)
|
||||||
// Decrement the expected number of creates because the informer won't observe this pod
|
// Decrement the expected number of creates because the informer won't observe this pod
|
||||||
glog.V(2).Infof("Failed creation, decrementing expectations for controller %q", jobKey)
|
|
||||||
jm.expectations.CreationObserved(jobKey)
|
jm.expectations.CreationObserved(jobKey)
|
||||||
util.HandleError(err)
|
activeLock.Lock()
|
||||||
active--
|
active--
|
||||||
|
activeLock.Unlock()
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
@ -442,3 +448,16 @@ func filterPods(pods []api.Pod, phase api.PodPhase) int {
|
|||||||
}
|
}
|
||||||
return result
|
return result
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// byCreationTimestamp sorts a list by creation timestamp, using their names as a tie breaker.
|
||||||
|
type byCreationTimestamp []experimental.Job
|
||||||
|
|
||||||
|
func (o byCreationTimestamp) Len() int { return len(o) }
|
||||||
|
func (o byCreationTimestamp) Swap(i, j int) { o[i], o[j] = o[j], o[i] }
|
||||||
|
|
||||||
|
func (o byCreationTimestamp) Less(i, j int) bool {
|
||||||
|
if o[i].CreationTimestamp.Equal(o[j].CreationTimestamp) {
|
||||||
|
return o[i].Name < o[j].Name
|
||||||
|
}
|
||||||
|
return o[i].CreationTimestamp.Before(o[j].CreationTimestamp)
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user