From 22072af90dec352b7c69115397291f25baa8e296 Mon Sep 17 00:00:00 2001 From: Mike Danese Date: Thu, 17 Sep 2015 19:16:04 -0700 Subject: [PATCH] rename jobmanager to jobcontroller --- .../app/controllermanager.go | 2 +- .../job/{job_controller.go => controller.go} | 30 +++++++++---------- ..._controller_test.go => controller_test.go} | 14 ++++----- 3 files changed, 23 insertions(+), 23 deletions(-) rename pkg/controller/job/{job_controller.go => controller.go} (93%) rename pkg/controller/job/{job_controller_test.go => controller_test.go} (98%) diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index 59458cb0e37..77c6fadfc65 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -241,7 +241,7 @@ func (s *CMServer) Run(_ []string) error { go daemon.NewDaemonSetsController(kubeClient). Run(s.ConcurrentDSCSyncs, util.NeverStop) - go job.NewJobManager(kubeClient). + go job.NewJobController(kubeClient). Run(s.ConcurrentJobSyncs, util.NeverStop) cloud, err := cloudprovider.InitCloudProvider(s.CloudProvider, s.CloudConfigFile) diff --git a/pkg/controller/job/job_controller.go b/pkg/controller/job/controller.go similarity index 93% rename from pkg/controller/job/job_controller.go rename to pkg/controller/job/controller.go index 1126af26682..0ec8023f9e6 100644 --- a/pkg/controller/job/job_controller.go +++ b/pkg/controller/job/controller.go @@ -40,7 +40,7 @@ import ( "k8s.io/kubernetes/pkg/watch" ) -type JobManager struct { +type JobController struct { kubeClient client.Interface podControl controller.PodControlInterface @@ -68,12 +68,12 @@ type JobManager struct { queue *workqueue.Type } -func NewJobManager(kubeClient client.Interface) *JobManager { +func NewJobController(kubeClient client.Interface) *JobController { eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartLogging(glog.Infof) eventBroadcaster.StartRecordingToSink(kubeClient.Events("")) - jm := &JobManager{ + jm := &JobController{ kubeClient: kubeClient, podControl: controller.RealPodControl{ KubeClient: kubeClient, @@ -134,7 +134,7 @@ func NewJobManager(kubeClient client.Interface) *JobManager { } // Run the main goroutine responsible for watching and syncing jobs. -func (jm *JobManager) Run(workers int, stopCh <-chan struct{}) { +func (jm *JobController) Run(workers int, stopCh <-chan struct{}) { defer util.HandleCrash() go jm.jobController.Run(stopCh) go jm.podController.Run(stopCh) @@ -147,10 +147,10 @@ func (jm *JobManager) Run(workers int, stopCh <-chan struct{}) { } // getPodJob returns the job managing the given pod. -func (jm *JobManager) getPodJob(pod *api.Pod) *experimental.Job { +func (jm *JobController) getPodJob(pod *api.Pod) *experimental.Job { jobs, err := jm.jobStore.GetPodJobs(pod) if err != nil { - glog.V(4).Infof("No jobs found for pod %v, job manager will avoid syncing", pod.Name) + glog.V(4).Infof("No jobs found for pod %v, job controller will avoid syncing", pod.Name) return nil } // TODO: add sorting and rethink the overlaping controllers, internally and with RCs @@ -158,10 +158,10 @@ func (jm *JobManager) getPodJob(pod *api.Pod) *experimental.Job { } // When a pod is created, enqueue the controller that manages it and update it's expectations. -func (jm *JobManager) addPod(obj interface{}) { +func (jm *JobController) addPod(obj interface{}) { pod := obj.(*api.Pod) if pod.DeletionTimestamp != nil { - // on a restart of the controller manager, it's possible a new pod shows up in a state that + // on a restart of the controller controller, it's possible a new pod shows up in a state that // is already pending deletion. Prevent the pod from being a creation observation. jm.deletePod(pod) return @@ -180,7 +180,7 @@ func (jm *JobManager) addPod(obj interface{}) { // When a pod is updated, figure out what job/s manage it and wake them up. // If the labels of the pod have changed we need to awaken both the old // and new job. old and cur must be *api.Pod types. -func (jm *JobManager) updatePod(old, cur interface{}) { +func (jm *JobController) updatePod(old, cur interface{}) { if api.Semantic.DeepEqual(old, cur) { // A periodic relist will send update events for all known pods. return @@ -210,7 +210,7 @@ func (jm *JobManager) updatePod(old, cur interface{}) { // When a pod is deleted, enqueue the job that manages the pod and update its expectations. // obj could be an *api.Pod, or a DeletionFinalStateUnknown marker item. -func (jm *JobManager) deletePod(obj interface{}) { +func (jm *JobController) deletePod(obj interface{}) { pod, ok := obj.(*api.Pod) // When a delete is dropped, the relist will notice a pod in the store not @@ -241,7 +241,7 @@ func (jm *JobManager) deletePod(obj interface{}) { } // obj could be an *experimental.Job, or a DeletionFinalStateUnknown marker item. -func (jm *JobManager) enqueueController(obj interface{}) { +func (jm *JobController) enqueueController(obj interface{}) { key, err := controller.KeyFunc(obj) if err != nil { glog.Errorf("Couldn't get key for object %+v: %v", obj, err) @@ -259,7 +259,7 @@ func (jm *JobManager) enqueueController(obj interface{}) { // worker runs a worker thread that just dequeues items, processes them, and marks them done. // It enforces that the syncHandler is never invoked concurrently with the same key. -func (jm *JobManager) worker() { +func (jm *JobController) worker() { for { func() { key, quit := jm.queue.Get() @@ -278,7 +278,7 @@ func (jm *JobManager) worker() { // syncJob will sync the job with the given key if it has had its expectations fulfilled, meaning // it did not expect to see any more of its pods created or deleted. This function is not meant to be invoked // concurrently with the same key. -func (jm *JobManager) syncJob(key string) error { +func (jm *JobController) syncJob(key string) error { startTime := time.Now() defer func() { glog.V(4).Infof("Finished syncing job %q (%v)", key, time.Now().Sub(startTime)) @@ -365,7 +365,7 @@ func getStatus(jobKey string, restartPolicy api.RestartPolicy, pods []api.Pod) ( return } -func (jm *JobManager) manageJob(activePods []*api.Pod, successful, unsuccessful int, job *experimental.Job) int { +func (jm *JobController) manageJob(activePods []*api.Pod, successful, unsuccessful int, job *experimental.Job) int { active := len(activePods) parallelism := *job.Spec.Parallelism jobKey, err := controller.KeyFunc(job) @@ -436,7 +436,7 @@ func (jm *JobManager) manageJob(activePods []*api.Pod, successful, unsuccessful return active } -func (jm *JobManager) updateJob(job *experimental.Job) error { +func (jm *JobController) updateJob(job *experimental.Job) error { _, err := jm.kubeClient.Experimental().Jobs(job.Namespace).Update(job) return err } diff --git a/pkg/controller/job/job_controller_test.go b/pkg/controller/job/controller_test.go similarity index 98% rename from pkg/controller/job/job_controller_test.go rename to pkg/controller/job/controller_test.go index 1a608df6574..d6bcda20dc3 100644 --- a/pkg/controller/job/job_controller_test.go +++ b/pkg/controller/job/controller_test.go @@ -218,7 +218,7 @@ func TestControllerSyncJob(t *testing.T) { for name, tc := range testCases { // job manager setup client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Experimental.Version()}) - manager := NewJobManager(client) + manager := NewJobController(client) fakePodControl := FakePodControl{err: tc.podControllerError} manager.podControl = &fakePodControl manager.podStoreSynced = alwaysReady @@ -282,7 +282,7 @@ func TestControllerSyncJob(t *testing.T) { func TestSyncJobDeleted(t *testing.T) { client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Experimental.Version()}) - manager := NewJobManager(client) + manager := NewJobController(client) fakePodControl := FakePodControl{} manager.podControl = &fakePodControl manager.podStoreSynced = alwaysReady @@ -302,7 +302,7 @@ func TestSyncJobDeleted(t *testing.T) { func TestSyncJobUpdateRequeue(t *testing.T) { client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Experimental.Version()}) - manager := NewJobManager(client) + manager := NewJobController(client) fakePodControl := FakePodControl{} manager.podControl = &fakePodControl manager.podStoreSynced = alwaysReady @@ -332,7 +332,7 @@ func TestSyncJobUpdateRequeue(t *testing.T) { func TestJobPodLookup(t *testing.T) { client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Experimental.Version()}) - manager := NewJobManager(client) + manager := NewJobController(client) manager.podStoreSynced = alwaysReady testCases := []struct { job *experimental.Job @@ -412,7 +412,7 @@ func (fe FakeJobExpectations) SatisfiedExpectations(controllerKey string) bool { // and checking expectations. func TestSyncJobExpectations(t *testing.T) { client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Experimental.Version()}) - manager := NewJobManager(client) + manager := NewJobController(client) fakePodControl := FakePodControl{} manager.podControl = &fakePodControl manager.podStoreSynced = alwaysReady @@ -449,7 +449,7 @@ func TestWatchJobs(t *testing.T) { fakeWatch := watch.NewFake() client := &testclient.Fake{} client.AddWatchReactor("*", testclient.DefaultWatchReactor(fakeWatch, nil)) - manager := NewJobManager(client) + manager := NewJobController(client) manager.podStoreSynced = alwaysReady var testJob experimental.Job @@ -512,7 +512,7 @@ func TestWatchPods(t *testing.T) { fakeWatch := watch.NewFake() client := &testclient.Fake{} client.AddWatchReactor("*", testclient.DefaultWatchReactor(fakeWatch, nil)) - manager := NewJobManager(client) + manager := NewJobController(client) manager.podStoreSynced = alwaysReady // Put one job and one pod into the store