From 305342c48d7300a599ad008727d822e6664bdacd Mon Sep 17 00:00:00 2001 From: Dominika Hodovska Date: Thu, 4 Aug 2016 09:06:29 +0200 Subject: [PATCH] Use shared informer factory in controllers --- .../app/controllermanager.go | 41 +++++++------------ 1 file changed, 14 insertions(+), 27 deletions(-) diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index 661e01f9e43..9812f8f1b1e 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -30,7 +30,6 @@ import ( "net/http" "net/http/pprof" "os" - "reflect" "strconv" "time" @@ -50,7 +49,6 @@ import ( "k8s.io/kubernetes/pkg/controller/daemon" "k8s.io/kubernetes/pkg/controller/deployment" endpointcontroller "k8s.io/kubernetes/pkg/controller/endpoint" - "k8s.io/kubernetes/pkg/controller/framework" "k8s.io/kubernetes/pkg/controller/framework/informers" "k8s.io/kubernetes/pkg/controller/garbagecollector" "k8s.io/kubernetes/pkg/controller/job" @@ -197,22 +195,14 @@ func Run(s *options.CMServer) error { } func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig *restclient.Config, stop <-chan struct{}) error { - podInformer := informers.NewPodInformer(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "pod-informer")), ResyncPeriod(s)()) - nodeInformer := informers.NewNodeInformer(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "node-informer")), ResyncPeriod(s)()) - pvcInformer := informers.NewPVCInformer(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "pvc-informer")), ResyncPeriod(s)()) - pvInformer := informers.NewPVInformer(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "pv-informer")), ResyncPeriod(s)()) - informers := map[reflect.Type]framework.SharedIndexInformer{} - informers[reflect.TypeOf(&api.Pod{})] = podInformer - informers[reflect.TypeOf(&api.Node{})] = nodeInformer - informers[reflect.TypeOf(&api.PersistentVolumeClaim{})] = pvcInformer - informers[reflect.TypeOf(&api.PersistentVolume{})] = pvInformer + sharedInformers := informers.NewSharedInformerFactory(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "shared-informers")), ResyncPeriod(s)()) - go endpointcontroller.NewEndpointController(podInformer, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "endpoint-controller"))). + go endpointcontroller.NewEndpointController(sharedInformers.Pods().Informer(), clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "endpoint-controller"))). Run(int(s.ConcurrentEndpointSyncs), wait.NeverStop) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) go replicationcontroller.NewReplicationManager( - podInformer, + sharedInformers.Pods().Informer(), clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "replication-controller")), ResyncPeriod(s), replicationcontroller.BurstReplicas, @@ -240,7 +230,7 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig if err != nil { glog.Warningf("Unsuccessful parsing of service CIDR %v: %v", s.ServiceCIDR, err) } - nodeController, err := nodecontroller.NewNodeController(podInformer, cloud, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "node-controller")), + nodeController, err := nodecontroller.NewNodeController(sharedInformers.Pods().Informer(), cloud, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "node-controller")), s.PodEvictionTimeout.Duration, s.DeletingPodsQps, s.NodeMonitorGracePeriod.Duration, s.NodeStartupGracePeriod.Duration, s.NodeMonitorPeriod.Duration, clusterCIDR, serviceCIDR, int(s.NodeCIDRMaskSize), s.AllocateNodeCIDRs) @@ -284,7 +274,7 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig KubeClient: resourceQuotaControllerClient, ResyncPeriod: controller.StaticResyncPeriodFunc(s.ResourceQuotaSyncPeriod.Duration), Registry: resourceQuotaRegistry, - ControllerFactory: resourcequotacontroller.NewReplenishmentControllerFactory(podInformer, resourceQuotaControllerClient), + ControllerFactory: resourcequotacontroller.NewReplenishmentControllerFactory(sharedInformers.Pods().Informer(), resourceQuotaControllerClient), ReplenishmentResyncPeriod: ResyncPeriod(s), GroupKindsToReplenish: groupKindsToReplenish, } @@ -344,14 +334,14 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig if containsResource(resources, "daemonsets") { glog.Infof("Starting daemon set controller") - go daemon.NewDaemonSetsController(podInformer, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "daemon-set-controller")), ResyncPeriod(s), int(s.LookupCacheSizeForDaemonSet)). + go daemon.NewDaemonSetsController(sharedInformers.Pods().Informer(), clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "daemon-set-controller")), ResyncPeriod(s), int(s.LookupCacheSizeForDaemonSet)). Run(int(s.ConcurrentDaemonSetSyncs), wait.NeverStop) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) } if containsResource(resources, "jobs") { glog.Infof("Starting job controller") - go job.NewJobController(podInformer, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "job-controller"))). + go job.NewJobController(sharedInformers.Pods().Informer(), clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "job-controller"))). Run(int(s.ConcurrentJobSyncs), wait.NeverStop) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) } @@ -365,7 +355,7 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig if containsResource(resources, "replicasets") { glog.Infof("Starting ReplicaSet controller") - go replicaset.NewReplicaSetController(podInformer, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "replicaset-controller")), ResyncPeriod(s), replicaset.BurstReplicas, int(s.LookupCacheSizeForRS), s.EnableGarbageCollector). + go replicaset.NewReplicaSetController(sharedInformers.Pods().Informer(), clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "replicaset-controller")), ResyncPeriod(s), replicaset.BurstReplicas, int(s.LookupCacheSizeForRS), s.EnableGarbageCollector). Run(int(s.ConcurrentRSSyncs), wait.NeverStop) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) } @@ -380,7 +370,7 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig glog.Infof("Starting PetSet controller") resyncPeriod := ResyncPeriod(s)() go petset.NewPetSetController( - podInformer, + sharedInformers.Pods().Informer(), // TODO: Switch to using clientset kubeClient, resyncPeriod, @@ -410,10 +400,10 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig attachDetachController, attachDetachControllerErr := attachdetach.NewAttachDetachController( clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "attachdetach-controller")), - podInformer, - nodeInformer, - pvcInformer, - pvInformer, + sharedInformers.Pods().Informer(), + sharedInformers.Nodes().Informer(), + sharedInformers.PersistentVolumeClaims().Informer(), + sharedInformers.PersistentVolumes().Informer(), cloud, ProbeAttachableVolumePlugins(s.VolumeConfiguration)) if attachDetachControllerErr != nil { @@ -497,10 +487,7 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig } } - // run the shared informers - for _, informer := range informers { - go informer.Run(wait.NeverStop) - } + sharedInformers.Start(stop) select {} }