diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index 452f3ee3fdc..ff98768035b 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -263,7 +263,7 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig KubeClient: resourceQuotaControllerClient, ResyncPeriod: controller.StaticResyncPeriodFunc(s.ResourceQuotaSyncPeriod.Duration), Registry: resourceQuotaRegistry, - ControllerFactory: resourcequotacontroller.NewReplenishmentControllerFactory(resourceQuotaControllerClient), + ControllerFactory: resourcequotacontroller.NewReplenishmentControllerFactory(podInformer, resourceQuotaControllerClient), ReplenishmentResyncPeriod: ResyncPeriod(s), GroupKindsToReplenish: groupKindsToReplenish, } diff --git a/contrib/mesos/pkg/controllermanager/controllermanager.go b/contrib/mesos/pkg/controllermanager/controllermanager.go index bb2bbf3e8d2..8e2868b1d4f 100644 --- a/contrib/mesos/pkg/controllermanager/controllermanager.go +++ b/contrib/mesos/pkg/controllermanager/controllermanager.go @@ -193,7 +193,7 @@ func (s *CMServer) Run(_ []string) error { Registry: resourceQuotaRegistry, GroupKindsToReplenish: groupKindsToReplenish, ReplenishmentResyncPeriod: s.resyncPeriod, - ControllerFactory: resourcequotacontroller.NewReplenishmentControllerFactory(resourceQuotaControllerClient), + ControllerFactory: resourcequotacontroller.NewReplenishmentControllerFactoryFromClient(resourceQuotaControllerClient), } go resourcequotacontroller.NewResourceQuotaController(resourceQuotaControllerOptions).Run(s.ConcurrentResourceQuotaSyncs, wait.NeverStop) diff --git a/pkg/controller/resourcequota/replenishment_controller.go b/pkg/controller/resourcequota/replenishment_controller.go index a75ba291dd8..c8a6031a812 100644 --- a/pkg/controller/resourcequota/replenishment_controller.go +++ b/pkg/controller/resourcequota/replenishment_controller.go @@ -28,6 +28,7 @@ import ( clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller/framework" + "k8s.io/kubernetes/pkg/controller/framework/informers" "k8s.io/kubernetes/pkg/quota/evaluator/core" "k8s.io/kubernetes/pkg/runtime" utilruntime "k8s.io/kubernetes/pkg/util/runtime" @@ -86,43 +87,46 @@ func ObjectReplenishmentDeleteFunc(options *ReplenishmentControllerOptions) func // ReplenishmentControllerFactory knows how to build replenishment controllers type ReplenishmentControllerFactory interface { - // NewController returns a controller configured with the specified options - NewController(options *ReplenishmentControllerOptions) (*framework.Controller, error) + // NewController returns a controller configured with the specified options. + // This method is NOT thread-safe. + NewController(options *ReplenishmentControllerOptions) (framework.ControllerInterface, error) } // replenishmentControllerFactory implements ReplenishmentControllerFactory type replenishmentControllerFactory struct { - kubeClient clientset.Interface + kubeClient clientset.Interface + podInformer framework.SharedInformer } // NewReplenishmentControllerFactory returns a factory that knows how to build controllers // to replenish resources when updated or deleted -func NewReplenishmentControllerFactory(kubeClient clientset.Interface) ReplenishmentControllerFactory { +func NewReplenishmentControllerFactory(podInformer framework.SharedInformer, kubeClient clientset.Interface) ReplenishmentControllerFactory { return &replenishmentControllerFactory{ - kubeClient: kubeClient, + kubeClient: kubeClient, + podInformer: podInformer, } } -func (r *replenishmentControllerFactory) NewController(options *ReplenishmentControllerOptions) (*framework.Controller, error) { - var result *framework.Controller +func NewReplenishmentControllerFactoryFromClient(kubeClient clientset.Interface) ReplenishmentControllerFactory { + return NewReplenishmentControllerFactory(nil, kubeClient) +} + +func (r *replenishmentControllerFactory) NewController(options *ReplenishmentControllerOptions) (framework.ControllerInterface, error) { + var result framework.ControllerInterface switch options.GroupKind { case api.Kind("Pod"): - _, result = framework.NewInformer( - &cache.ListWatch{ - ListFunc: func(options api.ListOptions) (runtime.Object, error) { - return r.kubeClient.Core().Pods(api.NamespaceAll).List(options) - }, - WatchFunc: func(options api.ListOptions) (watch.Interface, error) { - return r.kubeClient.Core().Pods(api.NamespaceAll).Watch(options) - }, - }, - &api.Pod{}, - options.ResyncPeriod(), - framework.ResourceEventHandlerFuncs{ + if r.podInformer != nil { + r.podInformer.AddEventHandler(framework.ResourceEventHandlerFuncs{ UpdateFunc: PodReplenishmentUpdateFunc(options), DeleteFunc: ObjectReplenishmentDeleteFunc(options), - }, - ) + }) + result = r.podInformer.GetController() + break + } + + r.podInformer = informers.CreateSharedPodInformer(r.kubeClient, options.ResyncPeriod()) + result = r.podInformer + case api.Kind("Service"): _, result = framework.NewInformer( &cache.ListWatch{ diff --git a/pkg/controller/resourcequota/resource_quota_controller.go b/pkg/controller/resourcequota/resource_quota_controller.go index 0a21ebf57f4..47b684d462e 100644 --- a/pkg/controller/resourcequota/resource_quota_controller.go +++ b/pkg/controller/resourcequota/resource_quota_controller.go @@ -69,7 +69,7 @@ type ResourceQuotaController struct { // knows how to calculate usage registry quota.Registry // controllers monitoring to notify for replenishment - replenishmentControllers []*framework.Controller + replenishmentControllers []framework.ControllerInterface } func NewResourceQuotaController(options *ResourceQuotaControllerOptions) *ResourceQuotaController { @@ -79,7 +79,7 @@ func NewResourceQuotaController(options *ResourceQuotaControllerOptions) *Resour queue: workqueue.New(), resyncPeriod: options.ResyncPeriod, registry: options.Registry, - replenishmentControllers: []*framework.Controller{}, + replenishmentControllers: []framework.ControllerInterface{}, } // set the synchronization handler diff --git a/pkg/controller/resourcequota/resource_quota_controller_test.go b/pkg/controller/resourcequota/resource_quota_controller_test.go index b65fb192225..f50dc5b67a9 100644 --- a/pkg/controller/resourcequota/resource_quota_controller_test.go +++ b/pkg/controller/resourcequota/resource_quota_controller_test.go @@ -113,7 +113,7 @@ func TestSyncResourceQuota(t *testing.T) { api.Kind("ReplicationController"), api.Kind("PersistentVolumeClaim"), }, - ControllerFactory: NewReplenishmentControllerFactory(kubeClient), + ControllerFactory: NewReplenishmentControllerFactoryFromClient(kubeClient), ReplenishmentResyncPeriod: controller.NoResyncPeriodFunc, } quotaController := NewResourceQuotaController(resourceQuotaControllerOptions) @@ -199,7 +199,7 @@ func TestSyncResourceQuotaSpecChange(t *testing.T) { api.Kind("ReplicationController"), api.Kind("PersistentVolumeClaim"), }, - ControllerFactory: NewReplenishmentControllerFactory(kubeClient), + ControllerFactory: NewReplenishmentControllerFactoryFromClient(kubeClient), ReplenishmentResyncPeriod: controller.NoResyncPeriodFunc, } quotaController := NewResourceQuotaController(resourceQuotaControllerOptions) @@ -276,7 +276,7 @@ func TestSyncResourceQuotaNoChange(t *testing.T) { api.Kind("ReplicationController"), api.Kind("PersistentVolumeClaim"), }, - ControllerFactory: NewReplenishmentControllerFactory(kubeClient), + ControllerFactory: NewReplenishmentControllerFactoryFromClient(kubeClient), ReplenishmentResyncPeriod: controller.NoResyncPeriodFunc, } quotaController := NewResourceQuotaController(resourceQuotaControllerOptions)