From 8c4e3af1a364be17be822bb9f81cee0e24abac6c Mon Sep 17 00:00:00 2001 From: deads2k Date: Tue, 19 Apr 2016 10:22:22 -0400 Subject: [PATCH] switch job controller to shared informer --- .../app/controllermanager.go | 2 +- .../controllermanager/controllermanager.go | 2 +- pkg/controller/job/controller.go | 51 +++++++++------- pkg/controller/job/controller_test.go | 58 +++++++++++-------- 4 files changed, 64 insertions(+), 49 deletions(-) diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index c52e860ad08..452f3ee3fdc 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -330,7 +330,7 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig if containsResource(resources, "jobs") { glog.Infof("Starting job controller") - go job.NewJobController(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "job-controller")), ResyncPeriod(s)). + go job.NewJobController(podInformer, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "job-controller"))). Run(s.ConcurrentJobSyncs, wait.NeverStop) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) } diff --git a/contrib/mesos/pkg/controllermanager/controllermanager.go b/contrib/mesos/pkg/controllermanager/controllermanager.go index 4bad79bba52..bb2bbf3e8d2 100644 --- a/contrib/mesos/pkg/controllermanager/controllermanager.go +++ b/contrib/mesos/pkg/controllermanager/controllermanager.go @@ -254,7 +254,7 @@ func (s *CMServer) Run(_ []string) error { if containsResource(resources, "jobs") { glog.Infof("Starting job controller") - go job.NewJobController(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "job-controller")), s.resyncPeriod). + go job.NewJobControllerFromClient(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "job-controller")), s.resyncPeriod). Run(s.ConcurrentJobSyncs, wait.NeverStop) } diff --git a/pkg/controller/job/controller.go b/pkg/controller/job/controller.go index 2a259303d27..1db765e8e2b 100644 --- a/pkg/controller/job/controller.go +++ b/pkg/controller/job/controller.go @@ -32,6 +32,7 @@ import ( "k8s.io/kubernetes/pkg/client/record" "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller/framework" + "k8s.io/kubernetes/pkg/controller/framework/informers" replicationcontroller "k8s.io/kubernetes/pkg/controller/replication" "k8s.io/kubernetes/pkg/runtime" utilruntime "k8s.io/kubernetes/pkg/util/runtime" @@ -44,6 +45,13 @@ type JobController struct { kubeClient clientset.Interface podControl controller.PodControlInterface + // internalPodInformer is used to hold a personal informer. If we're using + // a normal shared informer, then the informer will be started for us. If + // we have a personal informer, we must start it ourselves. If you start + // the controller using NewJobController(passing SharedInformer), this + // will be null + internalPodInformer framework.SharedInformer + // To allow injection of updateJobStatus for testing. updateHandler func(job *extensions.Job) error syncHandler func(jobKey string) error @@ -61,8 +69,6 @@ type JobController struct { // A store of pods, populated by the podController podStore cache.StoreToPodLister - // Watches changes to all pods - podController *framework.Controller // Jobs that need to be updated queue *workqueue.Type @@ -70,7 +76,7 @@ type JobController struct { recorder record.EventRecorder } -func NewJobController(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc) *JobController { +func NewJobController(podInformer framework.SharedInformer, kubeClient clientset.Interface) *JobController { eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartLogging(glog.Infof) // TODO: remove the wrapper when every clients have moved to use the clientset. @@ -110,27 +116,24 @@ func NewJobController(kubeClient clientset.Interface, resyncPeriod controller.Re }, ) - jm.podStore.Store, jm.podController = framework.NewInformer( - &cache.ListWatch{ - ListFunc: func(options api.ListOptions) (runtime.Object, error) { - return jm.kubeClient.Core().Pods(api.NamespaceAll).List(options) - }, - WatchFunc: func(options api.ListOptions) (watch.Interface, error) { - return jm.kubeClient.Core().Pods(api.NamespaceAll).Watch(options) - }, - }, - &api.Pod{}, - resyncPeriod(), - framework.ResourceEventHandlerFuncs{ - AddFunc: jm.addPod, - UpdateFunc: jm.updatePod, - DeleteFunc: jm.deletePod, - }, - ) + podInformer.AddEventHandler(framework.ResourceEventHandlerFuncs{ + AddFunc: jm.addPod, + UpdateFunc: jm.updatePod, + DeleteFunc: jm.deletePod, + }) + jm.podStore.Store = podInformer.GetStore() + jm.podStoreSynced = podInformer.HasSynced jm.updateHandler = jm.updateJobStatus jm.syncHandler = jm.syncJob - jm.podStoreSynced = jm.podController.HasSynced + return jm +} + +func NewJobControllerFromClient(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc) *JobController { + podInformer := informers.CreateSharedPodInformer(kubeClient, resyncPeriod()) + jm := NewJobController(podInformer, kubeClient) + jm.internalPodInformer = podInformer + return jm } @@ -138,10 +141,14 @@ func NewJobController(kubeClient clientset.Interface, resyncPeriod controller.Re func (jm *JobController) Run(workers int, stopCh <-chan struct{}) { defer utilruntime.HandleCrash() go jm.jobController.Run(stopCh) - go jm.podController.Run(stopCh) for i := 0; i < workers; i++ { go wait.Until(jm.worker, time.Second, stopCh) } + + if jm.internalPodInformer != nil { + go jm.internalPodInformer.Run(stopCh) + } + <-stopCh glog.Infof("Shutting down Job Manager") jm.queue.ShutDown() diff --git a/pkg/controller/job/controller_test.go b/pkg/controller/job/controller_test.go index 63235cb3846..5bf7b50d1f0 100644 --- a/pkg/controller/job/controller_test.go +++ b/pkg/controller/job/controller_test.go @@ -18,8 +18,8 @@ package job import ( "fmt" + "reflect" "testing" - "time" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/testapi" @@ -32,7 +32,6 @@ import ( "k8s.io/kubernetes/pkg/client/unversioned/testclient" "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/util/rand" - "k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/watch" ) @@ -208,7 +207,7 @@ func TestControllerSyncJob(t *testing.T) { for name, tc := range testCases { // job manager setup clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) - manager := NewJobController(clientset, controller.NoResyncPeriodFunc) + manager := NewJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) fakePodControl := controller.FakePodControl{Err: tc.podControllerError} manager.podControl = &fakePodControl manager.podStoreSynced = alwaysReady @@ -303,7 +302,7 @@ func TestSyncJobPastDeadline(t *testing.T) { for name, tc := range testCases { // job manager setup clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) - manager := NewJobController(clientset, controller.NoResyncPeriodFunc) + manager := NewJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) fakePodControl := controller.FakePodControl{} manager.podControl = &fakePodControl manager.podStoreSynced = alwaysReady @@ -373,7 +372,7 @@ func getCondition(job *extensions.Job, condition extensions.JobConditionType) bo func TestSyncPastDeadlineJobFinished(t *testing.T) { clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) - manager := NewJobController(clientset, controller.NoResyncPeriodFunc) + manager := NewJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) fakePodControl := controller.FakePodControl{} manager.podControl = &fakePodControl manager.podStoreSynced = alwaysReady @@ -407,7 +406,7 @@ func TestSyncPastDeadlineJobFinished(t *testing.T) { func TestSyncJobComplete(t *testing.T) { clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) - manager := NewJobController(clientset, controller.NoResyncPeriodFunc) + manager := NewJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) fakePodControl := controller.FakePodControl{} manager.podControl = &fakePodControl manager.podStoreSynced = alwaysReady @@ -432,7 +431,7 @@ func TestSyncJobComplete(t *testing.T) { func TestSyncJobDeleted(t *testing.T) { clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) - manager := NewJobController(clientset, controller.NoResyncPeriodFunc) + manager := NewJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) fakePodControl := controller.FakePodControl{} manager.podControl = &fakePodControl manager.podStoreSynced = alwaysReady @@ -452,7 +451,7 @@ func TestSyncJobDeleted(t *testing.T) { func TestSyncJobUpdateRequeue(t *testing.T) { clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) - manager := NewJobController(clientset, controller.NoResyncPeriodFunc) + manager := NewJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) fakePodControl := controller.FakePodControl{} manager.podControl = &fakePodControl manager.podStoreSynced = alwaysReady @@ -473,7 +472,7 @@ func TestSyncJobUpdateRequeue(t *testing.T) { func TestJobPodLookup(t *testing.T) { clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) - manager := NewJobController(clientset, controller.NoResyncPeriodFunc) + manager := NewJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) manager.podStoreSynced = alwaysReady testCases := []struct { job *extensions.Job @@ -563,7 +562,7 @@ func (fe FakeJobExpectations) SatisfiedExpectations(controllerKey string) bool { // and checking expectations. func TestSyncJobExpectations(t *testing.T) { clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) - manager := NewJobController(clientset, controller.NoResyncPeriodFunc) + manager := NewJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) fakePodControl := controller.FakePodControl{} manager.podControl = &fakePodControl manager.podStoreSynced = alwaysReady @@ -599,8 +598,8 @@ type FakeWatcher struct { func TestWatchJobs(t *testing.T) { clientset := fake.NewSimpleClientset() fakeWatch := watch.NewFake() - clientset.PrependWatchReactor("*", core.DefaultWatchReactor(fakeWatch, nil)) - manager := NewJobController(clientset, controller.NoResyncPeriodFunc) + clientset.PrependWatchReactor("jobs", core.DefaultWatchReactor(fakeWatch, nil)) + manager := NewJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) manager.podStoreSynced = alwaysReady var testJob extensions.Job @@ -614,9 +613,12 @@ func TestWatchJobs(t *testing.T) { if !exists || err != nil { t.Errorf("Expected to find job under key %v", key) } - job := *obj.(*extensions.Job) - if !api.Semantic.DeepDerivative(job, testJob) { - t.Errorf("Expected %#v, but got %#v", testJob, job) + job, ok := obj.(*extensions.Job) + if !ok { + t.Fatalf("unexpected type: %v %#v", reflect.TypeOf(obj), obj) + } + if !api.Semantic.DeepDerivative(*job, testJob) { + t.Errorf("Expected %#v, but got %#v", testJob, *job) } close(received) return nil @@ -625,8 +627,7 @@ func TestWatchJobs(t *testing.T) { // and make sure it hits the sync method. stopCh := make(chan struct{}) defer close(stopCh) - go manager.jobController.Run(stopCh) - go wait.Until(manager.worker, 10*time.Millisecond, stopCh) + go manager.Run(1, stopCh) // We're sending new job to see if it reaches syncHandler. testJob.Name = "foo" @@ -661,27 +662,35 @@ func TestIsJobFinished(t *testing.T) { } func TestWatchPods(t *testing.T) { - clientset := fake.NewSimpleClientset() + testJob := newJob(2, 2) + clientset := fake.NewSimpleClientset(testJob) fakeWatch := watch.NewFake() - clientset.PrependWatchReactor("*", core.DefaultWatchReactor(fakeWatch, nil)) - manager := NewJobController(clientset, controller.NoResyncPeriodFunc) + clientset.PrependWatchReactor("pods", core.DefaultWatchReactor(fakeWatch, nil)) + manager := NewJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) manager.podStoreSynced = alwaysReady // Put one job and one pod into the store - testJob := newJob(2, 2) manager.jobStore.Store.Add(testJob) received := make(chan struct{}) // The pod update sent through the fakeWatcher should figure out the managing job and // send it into the syncHandler. manager.syncHandler = func(key string) error { - obj, exists, err := manager.jobStore.Store.GetByKey(key) if !exists || err != nil { t.Errorf("Expected to find job under key %v", key) + close(received) + return nil + } + job, ok := obj.(*extensions.Job) + if !ok { + t.Errorf("unexpected type: %v %#v", reflect.TypeOf(obj), obj) + close(received) + return nil } - job := obj.(*extensions.Job) if !api.Semantic.DeepDerivative(job, testJob) { t.Errorf("\nExpected %#v,\nbut got %#v", testJob, job) + close(received) + return nil } close(received) return nil @@ -690,8 +699,7 @@ func TestWatchPods(t *testing.T) { // and make sure it hits the sync method for the right job. stopCh := make(chan struct{}) defer close(stopCh) - go manager.podController.Run(stopCh) - go wait.Until(manager.worker, 10*time.Millisecond, stopCh) + go manager.Run(1, stopCh) pods := newPodList(1, api.PodRunning, testJob) testPod := pods[0]