diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index 2feb17825dd..abc2e0e5fdb 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -264,7 +264,7 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig KubeClient: resourceQuotaControllerClient, ResyncPeriod: controller.StaticResyncPeriodFunc(s.ResourceQuotaSyncPeriod.Duration), Registry: resourceQuotaRegistry, - ControllerFactory: resourcequotacontroller.NewReplenishmentControllerFactory(resourceQuotaControllerClient), + ControllerFactory: resourcequotacontroller.NewReplenishmentControllerFactory(podInformer, resourceQuotaControllerClient), ReplenishmentResyncPeriod: ResyncPeriod(s), GroupKindsToReplenish: groupKindsToReplenish, } @@ -324,14 +324,14 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig if containsResource(resources, "daemonsets") { glog.Infof("Starting daemon set controller") - go daemon.NewDaemonSetsController(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "daemon-set-controller")), ResyncPeriod(s), s.LookupCacheSizeForDaemonSet). + go daemon.NewDaemonSetsController(podInformer, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "daemon-set-controller")), ResyncPeriod(s), s.LookupCacheSizeForDaemonSet). Run(s.ConcurrentDaemonSetSyncs, wait.NeverStop) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) } if containsResource(resources, "jobs") { glog.Infof("Starting job controller") - go job.NewJobController(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "job-controller")), ResyncPeriod(s)). + go job.NewJobController(podInformer, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "job-controller"))). Run(s.ConcurrentJobSyncs, wait.NeverStop) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) } diff --git a/contrib/mesos/pkg/controllermanager/controllermanager.go b/contrib/mesos/pkg/controllermanager/controllermanager.go index 05c09f90c9d..8e2868b1d4f 100644 --- a/contrib/mesos/pkg/controllermanager/controllermanager.go +++ b/contrib/mesos/pkg/controllermanager/controllermanager.go @@ -193,7 +193,7 @@ func (s *CMServer) Run(_ []string) error { Registry: resourceQuotaRegistry, GroupKindsToReplenish: groupKindsToReplenish, ReplenishmentResyncPeriod: s.resyncPeriod, - ControllerFactory: resourcequotacontroller.NewReplenishmentControllerFactory(resourceQuotaControllerClient), + ControllerFactory: resourcequotacontroller.NewReplenishmentControllerFactoryFromClient(resourceQuotaControllerClient), } go resourcequotacontroller.NewResourceQuotaController(resourceQuotaControllerOptions).Run(s.ConcurrentResourceQuotaSyncs, wait.NeverStop) @@ -248,13 +248,13 @@ func (s *CMServer) Run(_ []string) error { if containsResource(resources, "daemonsets") { glog.Infof("Starting daemon set controller") - go daemon.NewDaemonSetsController(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "daemon-set-controller")), s.resyncPeriod, s.LookupCacheSizeForDaemonSet). + go daemon.NewDaemonSetsControllerFromClient(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "daemon-set-controller")), s.resyncPeriod, s.LookupCacheSizeForDaemonSet). Run(s.ConcurrentDaemonSetSyncs, wait.NeverStop) } if containsResource(resources, "jobs") { glog.Infof("Starting job controller") - go job.NewJobController(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "job-controller")), s.resyncPeriod). + go job.NewJobControllerFromClient(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "job-controller")), s.resyncPeriod). Run(s.ConcurrentJobSyncs, wait.NeverStop) } diff --git a/pkg/controller/daemon/controller.go b/pkg/controller/daemon/controller.go index 5302cdda787..84c4c2dafde 100644 --- a/pkg/controller/daemon/controller.go +++ b/pkg/controller/daemon/controller.go @@ -35,6 +35,7 @@ import ( "k8s.io/kubernetes/pkg/client/record" "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller/framework" + "k8s.io/kubernetes/pkg/controller/framework/informers" "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/runtime" utilruntime "k8s.io/kubernetes/pkg/util/runtime" @@ -69,6 +70,13 @@ type DaemonSetsController struct { eventRecorder record.EventRecorder podControl controller.PodControlInterface + // internalPodInformer is used to hold a personal informer. If we're using + // a normal shared informer, then the informer will be started for us. If + // we have a personal informer, we must start it ourselves. If you start + // the controller using NewDaemonSetsController(passing SharedInformer), this + // will be null + internalPodInformer framework.SharedInformer + // An dsc is temporarily suspended after creating/deleting these many replicas. // It resumes normal action after observing the watch events for them. burstReplicas int @@ -86,7 +94,7 @@ type DaemonSetsController struct { // Watches changes to all daemon sets. dsController *framework.Controller // Watches changes to all pods - podController *framework.Controller + podController framework.ControllerInterface // Watches changes to all nodes. nodeController *framework.Controller // podStoreSynced returns true if the pod store has been synced at least once. @@ -99,7 +107,7 @@ type DaemonSetsController struct { queue *workqueue.Type } -func NewDaemonSetsController(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, lookupCacheSize int) *DaemonSetsController { +func NewDaemonSetsController(podInformer framework.SharedInformer, kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, lookupCacheSize int) *DaemonSetsController { eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartLogging(glog.Infof) // TODO: remove the wrapper when every clients have moved to use the clientset. @@ -163,25 +171,18 @@ func NewDaemonSetsController(kubeClient clientset.Interface, resyncPeriod contro }, }, ) + // Watch for creation/deletion of pods. The reason we watch is that we don't want a daemon set to create/delete // more pods until all the effects (expectations) of a daemon set's create/delete have been observed. - dsc.podStore.Store, dsc.podController = framework.NewInformer( - &cache.ListWatch{ - ListFunc: func(options api.ListOptions) (runtime.Object, error) { - return dsc.kubeClient.Core().Pods(api.NamespaceAll).List(options) - }, - WatchFunc: func(options api.ListOptions) (watch.Interface, error) { - return dsc.kubeClient.Core().Pods(api.NamespaceAll).Watch(options) - }, - }, - &api.Pod{}, - resyncPeriod(), - framework.ResourceEventHandlerFuncs{ - AddFunc: dsc.addPod, - UpdateFunc: dsc.updatePod, - DeleteFunc: dsc.deletePod, - }, - ) + podInformer.AddEventHandler(framework.ResourceEventHandlerFuncs{ + AddFunc: dsc.addPod, + UpdateFunc: dsc.updatePod, + DeleteFunc: dsc.deletePod, + }) + dsc.podStore.Store = podInformer.GetStore() + dsc.podController = podInformer.GetController() + dsc.podStoreSynced = podInformer.HasSynced + // Watch for new nodes or updates to nodes - daemon pods are launched on new nodes, and possibly when labels on nodes change, dsc.nodeStore.Store, dsc.nodeController = framework.NewInformer( &cache.ListWatch{ @@ -200,11 +201,18 @@ func NewDaemonSetsController(kubeClient clientset.Interface, resyncPeriod contro }, ) dsc.syncHandler = dsc.syncDaemonSet - dsc.podStoreSynced = dsc.podController.HasSynced dsc.lookupCache = controller.NewMatchingCache(lookupCacheSize) return dsc } +func NewDaemonSetsControllerFromClient(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, lookupCacheSize int) *DaemonSetsController { + podInformer := informers.CreateSharedPodInformer(kubeClient, resyncPeriod()) + dsc := NewDaemonSetsController(podInformer, kubeClient, resyncPeriod, lookupCacheSize) + dsc.internalPodInformer = podInformer + + return dsc +} + // Run begins watching and syncing daemon sets. func (dsc *DaemonSetsController) Run(workers int, stopCh <-chan struct{}) { defer utilruntime.HandleCrash() @@ -215,6 +223,11 @@ func (dsc *DaemonSetsController) Run(workers int, stopCh <-chan struct{}) { for i := 0; i < workers; i++ { go wait.Until(dsc.worker, time.Second, stopCh) } + + if dsc.internalPodInformer != nil { + go dsc.internalPodInformer.Run(stopCh) + } + <-stopCh glog.Infof("Shutting down Daemon Set Controller") dsc.queue.ShutDown() diff --git a/pkg/controller/daemon/controller_test.go b/pkg/controller/daemon/controller_test.go index 61b08c85ca6..9d3a47a6681 100644 --- a/pkg/controller/daemon/controller_test.go +++ b/pkg/controller/daemon/controller_test.go @@ -133,7 +133,7 @@ func addPods(podStore cache.Store, nodeName string, label map[string]string, num func newTestController() (*DaemonSetsController, *controller.FakePodControl) { clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) - manager := NewDaemonSetsController(clientset, controller.NoResyncPeriodFunc, 0) + manager := NewDaemonSetsControllerFromClient(clientset, controller.NoResyncPeriodFunc, 0) manager.podStoreSynced = alwaysReady podControl := &controller.FakePodControl{} manager.podControl = podControl diff --git a/pkg/controller/job/controller.go b/pkg/controller/job/controller.go index 2a259303d27..1db765e8e2b 100644 --- a/pkg/controller/job/controller.go +++ b/pkg/controller/job/controller.go @@ -32,6 +32,7 @@ import ( "k8s.io/kubernetes/pkg/client/record" "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller/framework" + "k8s.io/kubernetes/pkg/controller/framework/informers" replicationcontroller "k8s.io/kubernetes/pkg/controller/replication" "k8s.io/kubernetes/pkg/runtime" utilruntime "k8s.io/kubernetes/pkg/util/runtime" @@ -44,6 +45,13 @@ type JobController struct { kubeClient clientset.Interface podControl controller.PodControlInterface + // internalPodInformer is used to hold a personal informer. If we're using + // a normal shared informer, then the informer will be started for us. If + // we have a personal informer, we must start it ourselves. If you start + // the controller using NewJobController(passing SharedInformer), this + // will be null + internalPodInformer framework.SharedInformer + // To allow injection of updateJobStatus for testing. updateHandler func(job *extensions.Job) error syncHandler func(jobKey string) error @@ -61,8 +69,6 @@ type JobController struct { // A store of pods, populated by the podController podStore cache.StoreToPodLister - // Watches changes to all pods - podController *framework.Controller // Jobs that need to be updated queue *workqueue.Type @@ -70,7 +76,7 @@ type JobController struct { recorder record.EventRecorder } -func NewJobController(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc) *JobController { +func NewJobController(podInformer framework.SharedInformer, kubeClient clientset.Interface) *JobController { eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartLogging(glog.Infof) // TODO: remove the wrapper when every clients have moved to use the clientset. @@ -110,27 +116,24 @@ func NewJobController(kubeClient clientset.Interface, resyncPeriod controller.Re }, ) - jm.podStore.Store, jm.podController = framework.NewInformer( - &cache.ListWatch{ - ListFunc: func(options api.ListOptions) (runtime.Object, error) { - return jm.kubeClient.Core().Pods(api.NamespaceAll).List(options) - }, - WatchFunc: func(options api.ListOptions) (watch.Interface, error) { - return jm.kubeClient.Core().Pods(api.NamespaceAll).Watch(options) - }, - }, - &api.Pod{}, - resyncPeriod(), - framework.ResourceEventHandlerFuncs{ - AddFunc: jm.addPod, - UpdateFunc: jm.updatePod, - DeleteFunc: jm.deletePod, - }, - ) + podInformer.AddEventHandler(framework.ResourceEventHandlerFuncs{ + AddFunc: jm.addPod, + UpdateFunc: jm.updatePod, + DeleteFunc: jm.deletePod, + }) + jm.podStore.Store = podInformer.GetStore() + jm.podStoreSynced = podInformer.HasSynced jm.updateHandler = jm.updateJobStatus jm.syncHandler = jm.syncJob - jm.podStoreSynced = jm.podController.HasSynced + return jm +} + +func NewJobControllerFromClient(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc) *JobController { + podInformer := informers.CreateSharedPodInformer(kubeClient, resyncPeriod()) + jm := NewJobController(podInformer, kubeClient) + jm.internalPodInformer = podInformer + return jm } @@ -138,10 +141,14 @@ func NewJobController(kubeClient clientset.Interface, resyncPeriod controller.Re func (jm *JobController) Run(workers int, stopCh <-chan struct{}) { defer utilruntime.HandleCrash() go jm.jobController.Run(stopCh) - go jm.podController.Run(stopCh) for i := 0; i < workers; i++ { go wait.Until(jm.worker, time.Second, stopCh) } + + if jm.internalPodInformer != nil { + go jm.internalPodInformer.Run(stopCh) + } + <-stopCh glog.Infof("Shutting down Job Manager") jm.queue.ShutDown() diff --git a/pkg/controller/job/controller_test.go b/pkg/controller/job/controller_test.go index 63235cb3846..5bf7b50d1f0 100644 --- a/pkg/controller/job/controller_test.go +++ b/pkg/controller/job/controller_test.go @@ -18,8 +18,8 @@ package job import ( "fmt" + "reflect" "testing" - "time" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/testapi" @@ -32,7 +32,6 @@ import ( "k8s.io/kubernetes/pkg/client/unversioned/testclient" "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/util/rand" - "k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/watch" ) @@ -208,7 +207,7 @@ func TestControllerSyncJob(t *testing.T) { for name, tc := range testCases { // job manager setup clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) - manager := NewJobController(clientset, controller.NoResyncPeriodFunc) + manager := NewJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) fakePodControl := controller.FakePodControl{Err: tc.podControllerError} manager.podControl = &fakePodControl manager.podStoreSynced = alwaysReady @@ -303,7 +302,7 @@ func TestSyncJobPastDeadline(t *testing.T) { for name, tc := range testCases { // job manager setup clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) - manager := NewJobController(clientset, controller.NoResyncPeriodFunc) + manager := NewJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) fakePodControl := controller.FakePodControl{} manager.podControl = &fakePodControl manager.podStoreSynced = alwaysReady @@ -373,7 +372,7 @@ func getCondition(job *extensions.Job, condition extensions.JobConditionType) bo func TestSyncPastDeadlineJobFinished(t *testing.T) { clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) - manager := NewJobController(clientset, controller.NoResyncPeriodFunc) + manager := NewJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) fakePodControl := controller.FakePodControl{} manager.podControl = &fakePodControl manager.podStoreSynced = alwaysReady @@ -407,7 +406,7 @@ func TestSyncPastDeadlineJobFinished(t *testing.T) { func TestSyncJobComplete(t *testing.T) { clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) - manager := NewJobController(clientset, controller.NoResyncPeriodFunc) + manager := NewJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) fakePodControl := controller.FakePodControl{} manager.podControl = &fakePodControl manager.podStoreSynced = alwaysReady @@ -432,7 +431,7 @@ func TestSyncJobComplete(t *testing.T) { func TestSyncJobDeleted(t *testing.T) { clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) - manager := NewJobController(clientset, controller.NoResyncPeriodFunc) + manager := NewJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) fakePodControl := controller.FakePodControl{} manager.podControl = &fakePodControl manager.podStoreSynced = alwaysReady @@ -452,7 +451,7 @@ func TestSyncJobDeleted(t *testing.T) { func TestSyncJobUpdateRequeue(t *testing.T) { clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) - manager := NewJobController(clientset, controller.NoResyncPeriodFunc) + manager := NewJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) fakePodControl := controller.FakePodControl{} manager.podControl = &fakePodControl manager.podStoreSynced = alwaysReady @@ -473,7 +472,7 @@ func TestSyncJobUpdateRequeue(t *testing.T) { func TestJobPodLookup(t *testing.T) { clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) - manager := NewJobController(clientset, controller.NoResyncPeriodFunc) + manager := NewJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) manager.podStoreSynced = alwaysReady testCases := []struct { job *extensions.Job @@ -563,7 +562,7 @@ func (fe FakeJobExpectations) SatisfiedExpectations(controllerKey string) bool { // and checking expectations. func TestSyncJobExpectations(t *testing.T) { clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) - manager := NewJobController(clientset, controller.NoResyncPeriodFunc) + manager := NewJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) fakePodControl := controller.FakePodControl{} manager.podControl = &fakePodControl manager.podStoreSynced = alwaysReady @@ -599,8 +598,8 @@ type FakeWatcher struct { func TestWatchJobs(t *testing.T) { clientset := fake.NewSimpleClientset() fakeWatch := watch.NewFake() - clientset.PrependWatchReactor("*", core.DefaultWatchReactor(fakeWatch, nil)) - manager := NewJobController(clientset, controller.NoResyncPeriodFunc) + clientset.PrependWatchReactor("jobs", core.DefaultWatchReactor(fakeWatch, nil)) + manager := NewJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) manager.podStoreSynced = alwaysReady var testJob extensions.Job @@ -614,9 +613,12 @@ func TestWatchJobs(t *testing.T) { if !exists || err != nil { t.Errorf("Expected to find job under key %v", key) } - job := *obj.(*extensions.Job) - if !api.Semantic.DeepDerivative(job, testJob) { - t.Errorf("Expected %#v, but got %#v", testJob, job) + job, ok := obj.(*extensions.Job) + if !ok { + t.Fatalf("unexpected type: %v %#v", reflect.TypeOf(obj), obj) + } + if !api.Semantic.DeepDerivative(*job, testJob) { + t.Errorf("Expected %#v, but got %#v", testJob, *job) } close(received) return nil @@ -625,8 +627,7 @@ func TestWatchJobs(t *testing.T) { // and make sure it hits the sync method. stopCh := make(chan struct{}) defer close(stopCh) - go manager.jobController.Run(stopCh) - go wait.Until(manager.worker, 10*time.Millisecond, stopCh) + go manager.Run(1, stopCh) // We're sending new job to see if it reaches syncHandler. testJob.Name = "foo" @@ -661,27 +662,35 @@ func TestIsJobFinished(t *testing.T) { } func TestWatchPods(t *testing.T) { - clientset := fake.NewSimpleClientset() + testJob := newJob(2, 2) + clientset := fake.NewSimpleClientset(testJob) fakeWatch := watch.NewFake() - clientset.PrependWatchReactor("*", core.DefaultWatchReactor(fakeWatch, nil)) - manager := NewJobController(clientset, controller.NoResyncPeriodFunc) + clientset.PrependWatchReactor("pods", core.DefaultWatchReactor(fakeWatch, nil)) + manager := NewJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) manager.podStoreSynced = alwaysReady // Put one job and one pod into the store - testJob := newJob(2, 2) manager.jobStore.Store.Add(testJob) received := make(chan struct{}) // The pod update sent through the fakeWatcher should figure out the managing job and // send it into the syncHandler. manager.syncHandler = func(key string) error { - obj, exists, err := manager.jobStore.Store.GetByKey(key) if !exists || err != nil { t.Errorf("Expected to find job under key %v", key) + close(received) + return nil + } + job, ok := obj.(*extensions.Job) + if !ok { + t.Errorf("unexpected type: %v %#v", reflect.TypeOf(obj), obj) + close(received) + return nil } - job := obj.(*extensions.Job) if !api.Semantic.DeepDerivative(job, testJob) { t.Errorf("\nExpected %#v,\nbut got %#v", testJob, job) + close(received) + return nil } close(received) return nil @@ -690,8 +699,7 @@ func TestWatchPods(t *testing.T) { // and make sure it hits the sync method for the right job. stopCh := make(chan struct{}) defer close(stopCh) - go manager.podController.Run(stopCh) - go wait.Until(manager.worker, 10*time.Millisecond, stopCh) + go manager.Run(1, stopCh) pods := newPodList(1, api.PodRunning, testJob) testPod := pods[0] diff --git a/pkg/controller/resourcequota/replenishment_controller.go b/pkg/controller/resourcequota/replenishment_controller.go index c6f0014625b..19cd1e8895e 100644 --- a/pkg/controller/resourcequota/replenishment_controller.go +++ b/pkg/controller/resourcequota/replenishment_controller.go @@ -28,6 +28,7 @@ import ( clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller/framework" + "k8s.io/kubernetes/pkg/controller/framework/informers" "k8s.io/kubernetes/pkg/quota/evaluator/core" "k8s.io/kubernetes/pkg/runtime" utilruntime "k8s.io/kubernetes/pkg/util/runtime" @@ -86,43 +87,46 @@ func ObjectReplenishmentDeleteFunc(options *ReplenishmentControllerOptions) func // ReplenishmentControllerFactory knows how to build replenishment controllers type ReplenishmentControllerFactory interface { - // NewController returns a controller configured with the specified options - NewController(options *ReplenishmentControllerOptions) (*framework.Controller, error) + // NewController returns a controller configured with the specified options. + // This method is NOT thread-safe. + NewController(options *ReplenishmentControllerOptions) (framework.ControllerInterface, error) } // replenishmentControllerFactory implements ReplenishmentControllerFactory type replenishmentControllerFactory struct { - kubeClient clientset.Interface + kubeClient clientset.Interface + podInformer framework.SharedInformer } // NewReplenishmentControllerFactory returns a factory that knows how to build controllers // to replenish resources when updated or deleted -func NewReplenishmentControllerFactory(kubeClient clientset.Interface) ReplenishmentControllerFactory { +func NewReplenishmentControllerFactory(podInformer framework.SharedInformer, kubeClient clientset.Interface) ReplenishmentControllerFactory { return &replenishmentControllerFactory{ - kubeClient: kubeClient, + kubeClient: kubeClient, + podInformer: podInformer, } } -func (r *replenishmentControllerFactory) NewController(options *ReplenishmentControllerOptions) (*framework.Controller, error) { - var result *framework.Controller +func NewReplenishmentControllerFactoryFromClient(kubeClient clientset.Interface) ReplenishmentControllerFactory { + return NewReplenishmentControllerFactory(nil, kubeClient) +} + +func (r *replenishmentControllerFactory) NewController(options *ReplenishmentControllerOptions) (framework.ControllerInterface, error) { + var result framework.ControllerInterface switch options.GroupKind { case api.Kind("Pod"): - _, result = framework.NewInformer( - &cache.ListWatch{ - ListFunc: func(options api.ListOptions) (runtime.Object, error) { - return r.kubeClient.Core().Pods(api.NamespaceAll).List(options) - }, - WatchFunc: func(options api.ListOptions) (watch.Interface, error) { - return r.kubeClient.Core().Pods(api.NamespaceAll).Watch(options) - }, - }, - &api.Pod{}, - options.ResyncPeriod(), - framework.ResourceEventHandlerFuncs{ + if r.podInformer != nil { + r.podInformer.AddEventHandler(framework.ResourceEventHandlerFuncs{ UpdateFunc: PodReplenishmentUpdateFunc(options), DeleteFunc: ObjectReplenishmentDeleteFunc(options), - }, - ) + }) + result = r.podInformer.GetController() + break + } + + r.podInformer = informers.CreateSharedPodInformer(r.kubeClient, options.ResyncPeriod()) + result = r.podInformer + case api.Kind("Service"): _, result = framework.NewInformer( &cache.ListWatch{ diff --git a/pkg/controller/resourcequota/resource_quota_controller.go b/pkg/controller/resourcequota/resource_quota_controller.go index 0a21ebf57f4..47b684d462e 100644 --- a/pkg/controller/resourcequota/resource_quota_controller.go +++ b/pkg/controller/resourcequota/resource_quota_controller.go @@ -69,7 +69,7 @@ type ResourceQuotaController struct { // knows how to calculate usage registry quota.Registry // controllers monitoring to notify for replenishment - replenishmentControllers []*framework.Controller + replenishmentControllers []framework.ControllerInterface } func NewResourceQuotaController(options *ResourceQuotaControllerOptions) *ResourceQuotaController { @@ -79,7 +79,7 @@ func NewResourceQuotaController(options *ResourceQuotaControllerOptions) *Resour queue: workqueue.New(), resyncPeriod: options.ResyncPeriod, registry: options.Registry, - replenishmentControllers: []*framework.Controller{}, + replenishmentControllers: []framework.ControllerInterface{}, } // set the synchronization handler diff --git a/pkg/controller/resourcequota/resource_quota_controller_test.go b/pkg/controller/resourcequota/resource_quota_controller_test.go index cd49a377e15..71ef5df79b4 100644 --- a/pkg/controller/resourcequota/resource_quota_controller_test.go +++ b/pkg/controller/resourcequota/resource_quota_controller_test.go @@ -113,7 +113,7 @@ func TestSyncResourceQuota(t *testing.T) { api.Kind("ReplicationController"), api.Kind("PersistentVolumeClaim"), }, - ControllerFactory: NewReplenishmentControllerFactory(kubeClient), + ControllerFactory: NewReplenishmentControllerFactoryFromClient(kubeClient), ReplenishmentResyncPeriod: controller.NoResyncPeriodFunc, } quotaController := NewResourceQuotaController(resourceQuotaControllerOptions) @@ -199,7 +199,7 @@ func TestSyncResourceQuotaSpecChange(t *testing.T) { api.Kind("ReplicationController"), api.Kind("PersistentVolumeClaim"), }, - ControllerFactory: NewReplenishmentControllerFactory(kubeClient), + ControllerFactory: NewReplenishmentControllerFactoryFromClient(kubeClient), ReplenishmentResyncPeriod: controller.NoResyncPeriodFunc, } quotaController := NewResourceQuotaController(resourceQuotaControllerOptions) @@ -276,7 +276,7 @@ func TestSyncResourceQuotaNoChange(t *testing.T) { api.Kind("ReplicationController"), api.Kind("PersistentVolumeClaim"), }, - ControllerFactory: NewReplenishmentControllerFactory(kubeClient), + ControllerFactory: NewReplenishmentControllerFactoryFromClient(kubeClient), ReplenishmentResyncPeriod: controller.NoResyncPeriodFunc, } quotaController := NewResourceQuotaController(resourceQuotaControllerOptions)