mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-09-06 19:52:42 +00:00
rename jobmanager to jobcontroller
This commit is contained in:
@@ -241,7 +241,7 @@ func (s *CMServer) Run(_ []string) error {
|
||||
go daemon.NewDaemonSetsController(kubeClient).
|
||||
Run(s.ConcurrentDSCSyncs, util.NeverStop)
|
||||
|
||||
go job.NewJobManager(kubeClient).
|
||||
go job.NewJobController(kubeClient).
|
||||
Run(s.ConcurrentJobSyncs, util.NeverStop)
|
||||
|
||||
cloud, err := cloudprovider.InitCloudProvider(s.CloudProvider, s.CloudConfigFile)
|
||||
|
@@ -40,7 +40,7 @@ import (
|
||||
"k8s.io/kubernetes/pkg/watch"
|
||||
)
|
||||
|
||||
type JobManager struct {
|
||||
type JobController struct {
|
||||
kubeClient client.Interface
|
||||
podControl controller.PodControlInterface
|
||||
|
||||
@@ -68,12 +68,12 @@ type JobManager struct {
|
||||
queue *workqueue.Type
|
||||
}
|
||||
|
||||
func NewJobManager(kubeClient client.Interface) *JobManager {
|
||||
func NewJobController(kubeClient client.Interface) *JobController {
|
||||
eventBroadcaster := record.NewBroadcaster()
|
||||
eventBroadcaster.StartLogging(glog.Infof)
|
||||
eventBroadcaster.StartRecordingToSink(kubeClient.Events(""))
|
||||
|
||||
jm := &JobManager{
|
||||
jm := &JobController{
|
||||
kubeClient: kubeClient,
|
||||
podControl: controller.RealPodControl{
|
||||
KubeClient: kubeClient,
|
||||
@@ -134,7 +134,7 @@ func NewJobManager(kubeClient client.Interface) *JobManager {
|
||||
}
|
||||
|
||||
// Run the main goroutine responsible for watching and syncing jobs.
|
||||
func (jm *JobManager) Run(workers int, stopCh <-chan struct{}) {
|
||||
func (jm *JobController) Run(workers int, stopCh <-chan struct{}) {
|
||||
defer util.HandleCrash()
|
||||
go jm.jobController.Run(stopCh)
|
||||
go jm.podController.Run(stopCh)
|
||||
@@ -147,10 +147,10 @@ func (jm *JobManager) Run(workers int, stopCh <-chan struct{}) {
|
||||
}
|
||||
|
||||
// getPodJob returns the job managing the given pod.
|
||||
func (jm *JobManager) getPodJob(pod *api.Pod) *experimental.Job {
|
||||
func (jm *JobController) getPodJob(pod *api.Pod) *experimental.Job {
|
||||
jobs, err := jm.jobStore.GetPodJobs(pod)
|
||||
if err != nil {
|
||||
glog.V(4).Infof("No jobs found for pod %v, job manager will avoid syncing", pod.Name)
|
||||
glog.V(4).Infof("No jobs found for pod %v, job controller will avoid syncing", pod.Name)
|
||||
return nil
|
||||
}
|
||||
// TODO: add sorting and rethink the overlaping controllers, internally and with RCs
|
||||
@@ -158,10 +158,10 @@ func (jm *JobManager) getPodJob(pod *api.Pod) *experimental.Job {
|
||||
}
|
||||
|
||||
// When a pod is created, enqueue the controller that manages it and update it's expectations.
|
||||
func (jm *JobManager) addPod(obj interface{}) {
|
||||
func (jm *JobController) addPod(obj interface{}) {
|
||||
pod := obj.(*api.Pod)
|
||||
if pod.DeletionTimestamp != nil {
|
||||
// on a restart of the controller manager, it's possible a new pod shows up in a state that
|
||||
// on a restart of the controller controller, it's possible a new pod shows up in a state that
|
||||
// is already pending deletion. Prevent the pod from being a creation observation.
|
||||
jm.deletePod(pod)
|
||||
return
|
||||
@@ -180,7 +180,7 @@ func (jm *JobManager) addPod(obj interface{}) {
|
||||
// When a pod is updated, figure out what job/s manage it and wake them up.
|
||||
// If the labels of the pod have changed we need to awaken both the old
|
||||
// and new job. old and cur must be *api.Pod types.
|
||||
func (jm *JobManager) updatePod(old, cur interface{}) {
|
||||
func (jm *JobController) updatePod(old, cur interface{}) {
|
||||
if api.Semantic.DeepEqual(old, cur) {
|
||||
// A periodic relist will send update events for all known pods.
|
||||
return
|
||||
@@ -210,7 +210,7 @@ func (jm *JobManager) updatePod(old, cur interface{}) {
|
||||
|
||||
// When a pod is deleted, enqueue the job that manages the pod and update its expectations.
|
||||
// obj could be an *api.Pod, or a DeletionFinalStateUnknown marker item.
|
||||
func (jm *JobManager) deletePod(obj interface{}) {
|
||||
func (jm *JobController) deletePod(obj interface{}) {
|
||||
pod, ok := obj.(*api.Pod)
|
||||
|
||||
// When a delete is dropped, the relist will notice a pod in the store not
|
||||
@@ -241,7 +241,7 @@ func (jm *JobManager) deletePod(obj interface{}) {
|
||||
}
|
||||
|
||||
// obj could be an *experimental.Job, or a DeletionFinalStateUnknown marker item.
|
||||
func (jm *JobManager) enqueueController(obj interface{}) {
|
||||
func (jm *JobController) enqueueController(obj interface{}) {
|
||||
key, err := controller.KeyFunc(obj)
|
||||
if err != nil {
|
||||
glog.Errorf("Couldn't get key for object %+v: %v", obj, err)
|
||||
@@ -259,7 +259,7 @@ func (jm *JobManager) enqueueController(obj interface{}) {
|
||||
|
||||
// worker runs a worker thread that just dequeues items, processes them, and marks them done.
|
||||
// It enforces that the syncHandler is never invoked concurrently with the same key.
|
||||
func (jm *JobManager) worker() {
|
||||
func (jm *JobController) worker() {
|
||||
for {
|
||||
func() {
|
||||
key, quit := jm.queue.Get()
|
||||
@@ -278,7 +278,7 @@ func (jm *JobManager) worker() {
|
||||
// syncJob will sync the job with the given key if it has had its expectations fulfilled, meaning
|
||||
// it did not expect to see any more of its pods created or deleted. This function is not meant to be invoked
|
||||
// concurrently with the same key.
|
||||
func (jm *JobManager) syncJob(key string) error {
|
||||
func (jm *JobController) syncJob(key string) error {
|
||||
startTime := time.Now()
|
||||
defer func() {
|
||||
glog.V(4).Infof("Finished syncing job %q (%v)", key, time.Now().Sub(startTime))
|
||||
@@ -365,7 +365,7 @@ func getStatus(jobKey string, restartPolicy api.RestartPolicy, pods []api.Pod) (
|
||||
return
|
||||
}
|
||||
|
||||
func (jm *JobManager) manageJob(activePods []*api.Pod, successful, unsuccessful int, job *experimental.Job) int {
|
||||
func (jm *JobController) manageJob(activePods []*api.Pod, successful, unsuccessful int, job *experimental.Job) int {
|
||||
active := len(activePods)
|
||||
parallelism := *job.Spec.Parallelism
|
||||
jobKey, err := controller.KeyFunc(job)
|
||||
@@ -436,7 +436,7 @@ func (jm *JobManager) manageJob(activePods []*api.Pod, successful, unsuccessful
|
||||
return active
|
||||
}
|
||||
|
||||
func (jm *JobManager) updateJob(job *experimental.Job) error {
|
||||
func (jm *JobController) updateJob(job *experimental.Job) error {
|
||||
_, err := jm.kubeClient.Experimental().Jobs(job.Namespace).Update(job)
|
||||
return err
|
||||
}
|
@@ -218,7 +218,7 @@ func TestControllerSyncJob(t *testing.T) {
|
||||
for name, tc := range testCases {
|
||||
// job manager setup
|
||||
client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Experimental.Version()})
|
||||
manager := NewJobManager(client)
|
||||
manager := NewJobController(client)
|
||||
fakePodControl := FakePodControl{err: tc.podControllerError}
|
||||
manager.podControl = &fakePodControl
|
||||
manager.podStoreSynced = alwaysReady
|
||||
@@ -282,7 +282,7 @@ func TestControllerSyncJob(t *testing.T) {
|
||||
|
||||
func TestSyncJobDeleted(t *testing.T) {
|
||||
client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Experimental.Version()})
|
||||
manager := NewJobManager(client)
|
||||
manager := NewJobController(client)
|
||||
fakePodControl := FakePodControl{}
|
||||
manager.podControl = &fakePodControl
|
||||
manager.podStoreSynced = alwaysReady
|
||||
@@ -302,7 +302,7 @@ func TestSyncJobDeleted(t *testing.T) {
|
||||
|
||||
func TestSyncJobUpdateRequeue(t *testing.T) {
|
||||
client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Experimental.Version()})
|
||||
manager := NewJobManager(client)
|
||||
manager := NewJobController(client)
|
||||
fakePodControl := FakePodControl{}
|
||||
manager.podControl = &fakePodControl
|
||||
manager.podStoreSynced = alwaysReady
|
||||
@@ -332,7 +332,7 @@ func TestSyncJobUpdateRequeue(t *testing.T) {
|
||||
|
||||
func TestJobPodLookup(t *testing.T) {
|
||||
client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Experimental.Version()})
|
||||
manager := NewJobManager(client)
|
||||
manager := NewJobController(client)
|
||||
manager.podStoreSynced = alwaysReady
|
||||
testCases := []struct {
|
||||
job *experimental.Job
|
||||
@@ -412,7 +412,7 @@ func (fe FakeJobExpectations) SatisfiedExpectations(controllerKey string) bool {
|
||||
// and checking expectations.
|
||||
func TestSyncJobExpectations(t *testing.T) {
|
||||
client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Experimental.Version()})
|
||||
manager := NewJobManager(client)
|
||||
manager := NewJobController(client)
|
||||
fakePodControl := FakePodControl{}
|
||||
manager.podControl = &fakePodControl
|
||||
manager.podStoreSynced = alwaysReady
|
||||
@@ -449,7 +449,7 @@ func TestWatchJobs(t *testing.T) {
|
||||
fakeWatch := watch.NewFake()
|
||||
client := &testclient.Fake{}
|
||||
client.AddWatchReactor("*", testclient.DefaultWatchReactor(fakeWatch, nil))
|
||||
manager := NewJobManager(client)
|
||||
manager := NewJobController(client)
|
||||
manager.podStoreSynced = alwaysReady
|
||||
|
||||
var testJob experimental.Job
|
||||
@@ -512,7 +512,7 @@ func TestWatchPods(t *testing.T) {
|
||||
fakeWatch := watch.NewFake()
|
||||
client := &testclient.Fake{}
|
||||
client.AddWatchReactor("*", testclient.DefaultWatchReactor(fakeWatch, nil))
|
||||
manager := NewJobManager(client)
|
||||
manager := NewJobController(client)
|
||||
manager.podStoreSynced = alwaysReady
|
||||
|
||||
// Put one job and one pod into the store
|
Reference in New Issue
Block a user