Use shared informer factory in controllers

This commit is contained in:
Dominika Hodovska 2016-08-04 09:06:29 +02:00
parent 816f6d32ca
commit 305342c48d

View File

@ -30,7 +30,6 @@ import (
"net/http" "net/http"
"net/http/pprof" "net/http/pprof"
"os" "os"
"reflect"
"strconv" "strconv"
"time" "time"
@ -50,7 +49,6 @@ import (
"k8s.io/kubernetes/pkg/controller/daemon" "k8s.io/kubernetes/pkg/controller/daemon"
"k8s.io/kubernetes/pkg/controller/deployment" "k8s.io/kubernetes/pkg/controller/deployment"
endpointcontroller "k8s.io/kubernetes/pkg/controller/endpoint" endpointcontroller "k8s.io/kubernetes/pkg/controller/endpoint"
"k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/controller/framework/informers" "k8s.io/kubernetes/pkg/controller/framework/informers"
"k8s.io/kubernetes/pkg/controller/garbagecollector" "k8s.io/kubernetes/pkg/controller/garbagecollector"
"k8s.io/kubernetes/pkg/controller/job" "k8s.io/kubernetes/pkg/controller/job"
@ -197,22 +195,14 @@ func Run(s *options.CMServer) error {
} }
func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig *restclient.Config, stop <-chan struct{}) error { func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig *restclient.Config, stop <-chan struct{}) error {
podInformer := informers.NewPodInformer(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "pod-informer")), ResyncPeriod(s)()) sharedInformers := informers.NewSharedInformerFactory(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "shared-informers")), ResyncPeriod(s)())
nodeInformer := informers.NewNodeInformer(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "node-informer")), ResyncPeriod(s)())
pvcInformer := informers.NewPVCInformer(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "pvc-informer")), ResyncPeriod(s)())
pvInformer := informers.NewPVInformer(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "pv-informer")), ResyncPeriod(s)())
informers := map[reflect.Type]framework.SharedIndexInformer{}
informers[reflect.TypeOf(&api.Pod{})] = podInformer
informers[reflect.TypeOf(&api.Node{})] = nodeInformer
informers[reflect.TypeOf(&api.PersistentVolumeClaim{})] = pvcInformer
informers[reflect.TypeOf(&api.PersistentVolume{})] = pvInformer
go endpointcontroller.NewEndpointController(podInformer, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "endpoint-controller"))). go endpointcontroller.NewEndpointController(sharedInformers.Pods().Informer(), clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "endpoint-controller"))).
Run(int(s.ConcurrentEndpointSyncs), wait.NeverStop) Run(int(s.ConcurrentEndpointSyncs), wait.NeverStop)
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
go replicationcontroller.NewReplicationManager( go replicationcontroller.NewReplicationManager(
podInformer, sharedInformers.Pods().Informer(),
clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "replication-controller")), clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "replication-controller")),
ResyncPeriod(s), ResyncPeriod(s),
replicationcontroller.BurstReplicas, replicationcontroller.BurstReplicas,
@ -240,7 +230,7 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig
if err != nil { if err != nil {
glog.Warningf("Unsuccessful parsing of service CIDR %v: %v", s.ServiceCIDR, err) glog.Warningf("Unsuccessful parsing of service CIDR %v: %v", s.ServiceCIDR, err)
} }
nodeController, err := nodecontroller.NewNodeController(podInformer, cloud, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "node-controller")), nodeController, err := nodecontroller.NewNodeController(sharedInformers.Pods().Informer(), cloud, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "node-controller")),
s.PodEvictionTimeout.Duration, s.DeletingPodsQps, s.NodeMonitorGracePeriod.Duration, s.PodEvictionTimeout.Duration, s.DeletingPodsQps, s.NodeMonitorGracePeriod.Duration,
s.NodeStartupGracePeriod.Duration, s.NodeMonitorPeriod.Duration, clusterCIDR, serviceCIDR, s.NodeStartupGracePeriod.Duration, s.NodeMonitorPeriod.Duration, clusterCIDR, serviceCIDR,
int(s.NodeCIDRMaskSize), s.AllocateNodeCIDRs) int(s.NodeCIDRMaskSize), s.AllocateNodeCIDRs)
@ -284,7 +274,7 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig
KubeClient: resourceQuotaControllerClient, KubeClient: resourceQuotaControllerClient,
ResyncPeriod: controller.StaticResyncPeriodFunc(s.ResourceQuotaSyncPeriod.Duration), ResyncPeriod: controller.StaticResyncPeriodFunc(s.ResourceQuotaSyncPeriod.Duration),
Registry: resourceQuotaRegistry, Registry: resourceQuotaRegistry,
ControllerFactory: resourcequotacontroller.NewReplenishmentControllerFactory(podInformer, resourceQuotaControllerClient), ControllerFactory: resourcequotacontroller.NewReplenishmentControllerFactory(sharedInformers.Pods().Informer(), resourceQuotaControllerClient),
ReplenishmentResyncPeriod: ResyncPeriod(s), ReplenishmentResyncPeriod: ResyncPeriod(s),
GroupKindsToReplenish: groupKindsToReplenish, GroupKindsToReplenish: groupKindsToReplenish,
} }
@ -344,14 +334,14 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig
if containsResource(resources, "daemonsets") { if containsResource(resources, "daemonsets") {
glog.Infof("Starting daemon set controller") glog.Infof("Starting daemon set controller")
go daemon.NewDaemonSetsController(podInformer, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "daemon-set-controller")), ResyncPeriod(s), int(s.LookupCacheSizeForDaemonSet)). go daemon.NewDaemonSetsController(sharedInformers.Pods().Informer(), clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "daemon-set-controller")), ResyncPeriod(s), int(s.LookupCacheSizeForDaemonSet)).
Run(int(s.ConcurrentDaemonSetSyncs), wait.NeverStop) Run(int(s.ConcurrentDaemonSetSyncs), wait.NeverStop)
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
} }
if containsResource(resources, "jobs") { if containsResource(resources, "jobs") {
glog.Infof("Starting job controller") glog.Infof("Starting job controller")
go job.NewJobController(podInformer, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "job-controller"))). go job.NewJobController(sharedInformers.Pods().Informer(), clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "job-controller"))).
Run(int(s.ConcurrentJobSyncs), wait.NeverStop) Run(int(s.ConcurrentJobSyncs), wait.NeverStop)
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
} }
@ -365,7 +355,7 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig
if containsResource(resources, "replicasets") { if containsResource(resources, "replicasets") {
glog.Infof("Starting ReplicaSet controller") glog.Infof("Starting ReplicaSet controller")
go replicaset.NewReplicaSetController(podInformer, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "replicaset-controller")), ResyncPeriod(s), replicaset.BurstReplicas, int(s.LookupCacheSizeForRS), s.EnableGarbageCollector). go replicaset.NewReplicaSetController(sharedInformers.Pods().Informer(), clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "replicaset-controller")), ResyncPeriod(s), replicaset.BurstReplicas, int(s.LookupCacheSizeForRS), s.EnableGarbageCollector).
Run(int(s.ConcurrentRSSyncs), wait.NeverStop) Run(int(s.ConcurrentRSSyncs), wait.NeverStop)
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
} }
@ -380,7 +370,7 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig
glog.Infof("Starting PetSet controller") glog.Infof("Starting PetSet controller")
resyncPeriod := ResyncPeriod(s)() resyncPeriod := ResyncPeriod(s)()
go petset.NewPetSetController( go petset.NewPetSetController(
podInformer, sharedInformers.Pods().Informer(),
// TODO: Switch to using clientset // TODO: Switch to using clientset
kubeClient, kubeClient,
resyncPeriod, resyncPeriod,
@ -410,10 +400,10 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig
attachDetachController, attachDetachControllerErr := attachDetachController, attachDetachControllerErr :=
attachdetach.NewAttachDetachController( attachdetach.NewAttachDetachController(
clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "attachdetach-controller")), clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "attachdetach-controller")),
podInformer, sharedInformers.Pods().Informer(),
nodeInformer, sharedInformers.Nodes().Informer(),
pvcInformer, sharedInformers.PersistentVolumeClaims().Informer(),
pvInformer, sharedInformers.PersistentVolumes().Informer(),
cloud, cloud,
ProbeAttachableVolumePlugins(s.VolumeConfiguration)) ProbeAttachableVolumePlugins(s.VolumeConfiguration))
if attachDetachControllerErr != nil { if attachDetachControllerErr != nil {
@ -497,10 +487,7 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig
} }
} }
// run the shared informers sharedInformers.Start(stop)
for _, informer := range informers {
go informer.Run(wait.NeverStop)
}
select {} select {}
} }