From 816f6d32cac08fef87996ac20861ee968c1c8786 Mon Sep 17 00:00:00 2001 From: Dominika Hodovska Date: Thu, 4 Aug 2016 09:02:13 +0200 Subject: [PATCH] Collapse duplicate informer creation paths --- .../app/controllermanager.go | 8 +- pkg/controller/daemon/daemoncontroller.go | 2 +- .../endpoint/endpoints_controller.go | 2 +- pkg/controller/framework/informers/core.go | 92 +++---------------- pkg/controller/framework/informers/factory.go | 39 ++------ pkg/controller/job/jobcontroller.go | 2 +- pkg/controller/node/nodecontroller.go | 2 +- pkg/controller/replicaset/replica_set.go | 2 +- .../replication/replication_controller.go | 4 +- .../resourcequota/replenishment_controller.go | 2 +- .../attach_detach_controller_test.go | 8 +- .../reconciler/reconciler_test.go | 2 +- .../integration/replicaset/replicaset_test.go | 2 +- .../replicationcontroller_test.go | 2 +- 14 files changed, 40 insertions(+), 129 deletions(-) diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index bc27e3567a5..661e01f9e43 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -197,10 +197,10 @@ func Run(s *options.CMServer) error { } func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig *restclient.Config, stop <-chan struct{}) error { - podInformer := informers.CreateSharedPodIndexInformer(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "pod-informer")), ResyncPeriod(s)()) - nodeInformer := informers.CreateSharedNodeIndexInformer(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "node-informer")), ResyncPeriod(s)()) - pvcInformer := informers.CreateSharedPVCIndexInformer(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "pvc-informer")), ResyncPeriod(s)()) - pvInformer := informers.CreateSharedPVIndexInformer(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "pv-informer")), ResyncPeriod(s)()) + podInformer := informers.NewPodInformer(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "pod-informer")), ResyncPeriod(s)()) + nodeInformer := informers.NewNodeInformer(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "node-informer")), ResyncPeriod(s)()) + pvcInformer := informers.NewPVCInformer(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "pvc-informer")), ResyncPeriod(s)()) + pvInformer := informers.NewPVInformer(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "pv-informer")), ResyncPeriod(s)()) informers := map[reflect.Type]framework.SharedIndexInformer{} informers[reflect.TypeOf(&api.Pod{})] = podInformer informers[reflect.TypeOf(&api.Node{})] = nodeInformer diff --git a/pkg/controller/daemon/daemoncontroller.go b/pkg/controller/daemon/daemoncontroller.go index c713f4a72a1..c65d9c13a79 100644 --- a/pkg/controller/daemon/daemoncontroller.go +++ b/pkg/controller/daemon/daemoncontroller.go @@ -205,7 +205,7 @@ func NewDaemonSetsController(podInformer framework.SharedIndexInformer, kubeClie } func NewDaemonSetsControllerFromClient(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, lookupCacheSize int) *DaemonSetsController { - podInformer := informers.CreateSharedPodIndexInformer(kubeClient, resyncPeriod()) + podInformer := informers.NewPodInformer(kubeClient, resyncPeriod()) dsc := NewDaemonSetsController(podInformer, kubeClient, resyncPeriod, lookupCacheSize) dsc.internalPodInformer = podInformer diff --git a/pkg/controller/endpoint/endpoints_controller.go b/pkg/controller/endpoint/endpoints_controller.go index ce557bda81f..77a27383512 100644 --- a/pkg/controller/endpoint/endpoints_controller.go +++ b/pkg/controller/endpoint/endpoints_controller.go @@ -113,7 +113,7 @@ func NewEndpointController(podInformer framework.SharedIndexInformer, client *cl // NewEndpointControllerFromClient returns a new *EndpointController that runs its own informer. func NewEndpointControllerFromClient(client *clientset.Clientset, resyncPeriod controller.ResyncPeriodFunc) *EndpointController { - podInformer := informers.CreateSharedPodIndexInformer(client, resyncPeriod()) + podInformer := informers.NewPodInformer(client, resyncPeriod()) e := NewEndpointController(podInformer, client) e.internalPodInformer = podInformer diff --git a/pkg/controller/framework/informers/core.go b/pkg/controller/framework/informers/core.go index 744aaabad4c..a4f40b5870b 100644 --- a/pkg/controller/framework/informers/core.go +++ b/pkg/controller/framework/informers/core.go @@ -22,8 +22,6 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/client/cache" "k8s.io/kubernetes/pkg/controller/framework" - "k8s.io/kubernetes/pkg/runtime" - "k8s.io/kubernetes/pkg/watch" ) // PodInformer is type of SharedIndexInformer which watches and lists all pods. @@ -43,26 +41,12 @@ func (f *podInformer) Informer() framework.SharedIndexInformer { f.lock.Lock() defer f.lock.Unlock() - informerObj := &api.Pod{} - informerType := reflect.TypeOf(informerObj) + informerType := reflect.TypeOf(&api.Pod{}) informer, exists := f.informers[informerType] if exists { return informer } - - informer = framework.NewSharedIndexInformer( - &cache.ListWatch{ - ListFunc: func(options api.ListOptions) (runtime.Object, error) { - return f.client.Core().Pods(api.NamespaceAll).List(options) - }, - WatchFunc: func(options api.ListOptions) (watch.Interface, error) { - return f.client.Core().Pods(api.NamespaceAll).Watch(options) - }, - }, - informerObj, - f.defaultResync, - cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, - ) + informer = NewPodInformer(f.client, f.defaultResync) f.informers[informerType] = informer return informer @@ -92,25 +76,13 @@ type namespaceInformer struct { func (f *namespaceInformer) Informer() framework.SharedIndexInformer { f.lock.Lock() defer f.lock.Unlock() - informerObj := &api.Namespace{} - informerType := reflect.TypeOf(informerObj) + + informerType := reflect.TypeOf(&api.Namespace{}) informer, exists := f.informers[informerType] if exists { return informer } - informer = framework.NewSharedIndexInformer( - &cache.ListWatch{ - ListFunc: func(options api.ListOptions) (runtime.Object, error) { - return f.client.Core().Namespaces().List(options) - }, - WatchFunc: func(options api.ListOptions) (watch.Interface, error) { - return f.client.Core().Namespaces().Watch(options) - }, - }, - informerObj, - f.defaultResync, - cache.Indexers{}, - ) + informer = NewNamespaceInformer(f.client, f.defaultResync) f.informers[informerType] = informer return informer @@ -141,26 +113,12 @@ func (f *nodeInformer) Informer() framework.SharedIndexInformer { f.lock.Lock() defer f.lock.Unlock() - informerObj := &api.Node{} - informerType := reflect.TypeOf(informerObj) + informerType := reflect.TypeOf(&api.Node{}) informer, exists := f.informers[informerType] if exists { return informer } - - informer = framework.NewSharedIndexInformer( - &cache.ListWatch{ - ListFunc: func(options api.ListOptions) (runtime.Object, error) { - return f.client.Core().Nodes().List(options) - }, - WatchFunc: func(options api.ListOptions) (watch.Interface, error) { - return f.client.Core().Nodes().Watch(options) - }, - }, - informerObj, - f.defaultResync, - cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, - ) + informer = NewNodeInformer(f.client, f.defaultResync) f.informers[informerType] = informer return informer @@ -191,26 +149,12 @@ func (f *pvcInformer) Informer() framework.SharedIndexInformer { f.lock.Lock() defer f.lock.Unlock() - informerObj := &api.PersistentVolumeClaim{} - informerType := reflect.TypeOf(informerObj) + informerType := reflect.TypeOf(&api.PersistentVolumeClaim{}) informer, exists := f.informers[informerType] if exists { return informer } - - informer = framework.NewSharedIndexInformer( - &cache.ListWatch{ - ListFunc: func(options api.ListOptions) (runtime.Object, error) { - return f.client.Core().PersistentVolumeClaims(api.NamespaceAll).List(options) - }, - WatchFunc: func(options api.ListOptions) (watch.Interface, error) { - return f.client.Core().PersistentVolumeClaims(api.NamespaceAll).Watch(options) - }, - }, - informerObj, - f.defaultResync, - cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, - ) + informer = NewPVCInformer(f.client, f.defaultResync) f.informers[informerType] = informer return informer @@ -241,26 +185,12 @@ func (f *pvInformer) Informer() framework.SharedIndexInformer { f.lock.Lock() defer f.lock.Unlock() - informerObj := &api.PersistentVolume{} - informerType := reflect.TypeOf(informerObj) + informerType := reflect.TypeOf(&api.PersistentVolume{}) informer, exists := f.informers[informerType] if exists { return informer } - - informer = framework.NewSharedIndexInformer( - &cache.ListWatch{ - ListFunc: func(options api.ListOptions) (runtime.Object, error) { - return f.client.Core().PersistentVolumes().List(options) - }, - WatchFunc: func(options api.ListOptions) (watch.Interface, error) { - return f.client.Core().PersistentVolumes().Watch(options) - }, - }, - informerObj, - f.defaultResync, - cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, - ) + informer = NewPVInformer(f.client, f.defaultResync) f.informers[informerType] = informer return informer diff --git a/pkg/controller/framework/informers/factory.go b/pkg/controller/framework/informers/factory.go index d3292535788..de1a6918db1 100644 --- a/pkg/controller/framework/informers/factory.go +++ b/pkg/controller/framework/informers/factory.go @@ -101,27 +101,8 @@ func (f *sharedInformerFactory) PersistentVolumes() PVInformer { return &pvInformer{sharedInformerFactory: f} } -// CreateSharedPodInformer returns a SharedIndexInformer that lists and watches all pods -func CreateSharedPodInformer(client clientset.Interface, resyncPeriod time.Duration) framework.SharedIndexInformer { - sharedInformer := framework.NewSharedIndexInformer( - &cache.ListWatch{ - ListFunc: func(options api.ListOptions) (runtime.Object, error) { - return client.Core().Pods(api.NamespaceAll).List(options) - }, - WatchFunc: func(options api.ListOptions) (watch.Interface, error) { - return client.Core().Pods(api.NamespaceAll).Watch(options) - }, - }, - &api.Pod{}, - resyncPeriod, - cache.Indexers{}, - ) - - return sharedInformer -} - -// CreateSharedPodIndexInformer returns a SharedIndexInformer that lists and watches all pods -func CreateSharedPodIndexInformer(client clientset.Interface, resyncPeriod time.Duration) framework.SharedIndexInformer { +// NewPodInformer returns a SharedIndexInformer that lists and watches all pods +func NewPodInformer(client clientset.Interface, resyncPeriod time.Duration) framework.SharedIndexInformer { sharedIndexInformer := framework.NewSharedIndexInformer( &cache.ListWatch{ ListFunc: func(options api.ListOptions) (runtime.Object, error) { @@ -139,8 +120,8 @@ func CreateSharedPodIndexInformer(client clientset.Interface, resyncPeriod time. return sharedIndexInformer } -// CreateSharedNodeIndexInformer returns a SharedIndexInformer that lists and watches all nodes -func CreateSharedNodeIndexInformer(client clientset.Interface, resyncPeriod time.Duration) framework.SharedIndexInformer { +// NewNodeInformer returns a SharedIndexInformer that lists and watches all nodes +func NewNodeInformer(client clientset.Interface, resyncPeriod time.Duration) framework.SharedIndexInformer { sharedIndexInformer := framework.NewSharedIndexInformer( &cache.ListWatch{ ListFunc: func(options api.ListOptions) (runtime.Object, error) { @@ -157,8 +138,8 @@ func CreateSharedNodeIndexInformer(client clientset.Interface, resyncPeriod time return sharedIndexInformer } -// CreateSharedPVCIndexInformer returns a SharedIndexInformer that lists and watches all PVCs -func CreateSharedPVCIndexInformer(client clientset.Interface, resyncPeriod time.Duration) framework.SharedIndexInformer { +// NewPVCInformer returns a SharedIndexInformer that lists and watches all PVCs +func NewPVCInformer(client clientset.Interface, resyncPeriod time.Duration) framework.SharedIndexInformer { sharedIndexInformer := framework.NewSharedIndexInformer( &cache.ListWatch{ ListFunc: func(options api.ListOptions) (runtime.Object, error) { @@ -175,8 +156,8 @@ func CreateSharedPVCIndexInformer(client clientset.Interface, resyncPeriod time. return sharedIndexInformer } -// CreateSharedPVIndexInformer returns a SharedIndexInformer that lists and watches all PVs -func CreateSharedPVIndexInformer(client clientset.Interface, resyncPeriod time.Duration) framework.SharedIndexInformer { +// NewPVInformer returns a SharedIndexInformer that lists and watches all PVs +func NewPVInformer(client clientset.Interface, resyncPeriod time.Duration) framework.SharedIndexInformer { sharedIndexInformer := framework.NewSharedIndexInformer( &cache.ListWatch{ ListFunc: func(options api.ListOptions) (runtime.Object, error) { @@ -193,8 +174,8 @@ func CreateSharedPVIndexInformer(client clientset.Interface, resyncPeriod time.D return sharedIndexInformer } -// CreateSharedNamespaceIndexInformer returns a SharedIndexInformer that lists and watches namespaces -func CreateSharedNamespaceIndexInformer(client clientset.Interface, resyncPeriod time.Duration) framework.SharedIndexInformer { +// NewNamespaceInformer returns a SharedIndexInformer that lists and watches namespaces +func NewNamespaceInformer(client clientset.Interface, resyncPeriod time.Duration) framework.SharedIndexInformer { sharedIndexInformer := framework.NewSharedIndexInformer( &cache.ListWatch{ ListFunc: func(options api.ListOptions) (runtime.Object, error) { diff --git a/pkg/controller/job/jobcontroller.go b/pkg/controller/job/jobcontroller.go index 46156d68794..b4639dec589 100644 --- a/pkg/controller/job/jobcontroller.go +++ b/pkg/controller/job/jobcontroller.go @@ -135,7 +135,7 @@ func NewJobController(podInformer framework.SharedIndexInformer, kubeClient clie } func NewJobControllerFromClient(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc) *JobController { - podInformer := informers.CreateSharedPodIndexInformer(kubeClient, resyncPeriod()) + podInformer := informers.NewPodInformer(kubeClient, resyncPeriod()) jm := NewJobController(podInformer, kubeClient) jm.internalPodInformer = podInformer diff --git a/pkg/controller/node/nodecontroller.go b/pkg/controller/node/nodecontroller.go index 5c20a1c6dcc..77aa730ef76 100644 --- a/pkg/controller/node/nodecontroller.go +++ b/pkg/controller/node/nodecontroller.go @@ -343,7 +343,7 @@ func NewNodeControllerFromClient( serviceCIDR *net.IPNet, nodeCIDRMaskSize int, allocateNodeCIDRs bool) (*NodeController, error) { - podInformer := informers.CreateSharedPodIndexInformer(kubeClient, controller.NoResyncPeriodFunc()) + podInformer := informers.NewPodInformer(kubeClient, controller.NoResyncPeriodFunc()) nc, err := NewNodeController(podInformer, cloud, kubeClient, podEvictionTimeout, evictionLimiterQPS, nodeMonitorGracePeriod, nodeStartupGracePeriod, nodeMonitorPeriod, clusterCIDR, serviceCIDR, nodeCIDRMaskSize, allocateNodeCIDRs) if err != nil { diff --git a/pkg/controller/replicaset/replica_set.go b/pkg/controller/replicaset/replica_set.go index 4ffa0b48371..976cb545bfb 100644 --- a/pkg/controller/replicaset/replica_set.go +++ b/pkg/controller/replicaset/replica_set.go @@ -184,7 +184,7 @@ func newReplicaSetController(eventRecorder record.EventRecorder, podInformer fra // NewReplicationManagerFromClient creates a new ReplicationManager that runs its own informer. func NewReplicaSetControllerFromClient(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int, lookupCacheSize int) *ReplicaSetController { - podInformer := informers.CreateSharedPodIndexInformer(kubeClient, resyncPeriod()) + podInformer := informers.NewPodInformer(kubeClient, resyncPeriod()) garbageCollectorEnabled := false rsc := NewReplicaSetController(podInformer, kubeClient, resyncPeriod, burstReplicas, lookupCacheSize, garbageCollectorEnabled) rsc.internalPodInformer = podInformer diff --git a/pkg/controller/replication/replication_controller.go b/pkg/controller/replication/replication_controller.go index e5ae8d1c1f6..91660a5307a 100644 --- a/pkg/controller/replication/replication_controller.go +++ b/pkg/controller/replication/replication_controller.go @@ -189,7 +189,7 @@ func newReplicationManager(eventRecorder record.EventRecorder, podInformer frame // NewReplicationManagerFromClientForIntegration creates a new ReplicationManager that runs its own informer. It disables event recording for use in integration tests. func NewReplicationManagerFromClientForIntegration(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int, lookupCacheSize int) *ReplicationManager { - podInformer := informers.CreateSharedPodIndexInformer(kubeClient, resyncPeriod()) + podInformer := informers.NewPodInformer(kubeClient, resyncPeriod()) garbageCollectorEnabled := false rm := newReplicationManager(&record.FakeRecorder{}, podInformer, kubeClient, resyncPeriod, burstReplicas, lookupCacheSize, garbageCollectorEnabled) rm.internalPodInformer = podInformer @@ -198,7 +198,7 @@ func NewReplicationManagerFromClientForIntegration(kubeClient clientset.Interfac // NewReplicationManagerFromClient creates a new ReplicationManager that runs its own informer. func NewReplicationManagerFromClient(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int, lookupCacheSize int) *ReplicationManager { - podInformer := informers.CreateSharedPodIndexInformer(kubeClient, resyncPeriod()) + podInformer := informers.NewPodInformer(kubeClient, resyncPeriod()) garbageCollectorEnabled := false rm := NewReplicationManager(podInformer, kubeClient, resyncPeriod, burstReplicas, lookupCacheSize, garbageCollectorEnabled) rm.internalPodInformer = podInformer diff --git a/pkg/controller/resourcequota/replenishment_controller.go b/pkg/controller/resourcequota/replenishment_controller.go index f5c92b2b7f7..46448f1d73a 100644 --- a/pkg/controller/resourcequota/replenishment_controller.go +++ b/pkg/controller/resourcequota/replenishment_controller.go @@ -129,7 +129,7 @@ func (r *replenishmentControllerFactory) NewController(options *ReplenishmentCon break } - r.podInformer = informers.CreateSharedPodInformer(r.kubeClient, options.ResyncPeriod()) + r.podInformer = informers.NewPodInformer(r.kubeClient, options.ResyncPeriod()) result = r.podInformer case api.Kind("Service"): diff --git a/pkg/controller/volume/attachdetach/attach_detach_controller_test.go b/pkg/controller/volume/attachdetach/attach_detach_controller_test.go index dcf68470e41..5fad9405db7 100644 --- a/pkg/controller/volume/attachdetach/attach_detach_controller_test.go +++ b/pkg/controller/volume/attachdetach/attach_detach_controller_test.go @@ -28,10 +28,10 @@ func Test_NewAttachDetachController_Positive(t *testing.T) { // Arrange fakeKubeClient := controllervolumetesting.CreateTestClient() resyncPeriod := 5 * time.Minute - podInformer := informers.CreateSharedPodIndexInformer(fakeKubeClient, resyncPeriod) - nodeInformer := informers.CreateSharedNodeIndexInformer(fakeKubeClient, resyncPeriod) - pvcInformer := informers.CreateSharedPVCIndexInformer(fakeKubeClient, resyncPeriod) - pvInformer := informers.CreateSharedPVIndexInformer(fakeKubeClient, resyncPeriod) + podInformer := informers.NewPodInformer(fakeKubeClient, resyncPeriod) + nodeInformer := informers.NewNodeInformer(fakeKubeClient, resyncPeriod) + pvcInformer := informers.NewPVCInformer(fakeKubeClient, resyncPeriod) + pvInformer := informers.NewPVInformer(fakeKubeClient, resyncPeriod) // Act _, err := NewAttachDetachController( diff --git a/pkg/controller/volume/attachdetach/reconciler/reconciler_test.go b/pkg/controller/volume/attachdetach/reconciler/reconciler_test.go index 5ba08fcdefa..4b2e564518c 100644 --- a/pkg/controller/volume/attachdetach/reconciler/reconciler_test.go +++ b/pkg/controller/volume/attachdetach/reconciler/reconciler_test.go @@ -47,7 +47,7 @@ func Test_Run_Positive_DoNothing(t *testing.T) { fakeKubeClient := controllervolumetesting.CreateTestClient() ad := operationexecutor.NewOperationExecutor( fakeKubeClient, volumePluginMgr) - nodeInformer := informers.CreateSharedNodeIndexInformer( + nodeInformer := informers.NewNodeInformer( fakeKubeClient, resyncPeriod) nsu := statusupdater.NewNodeStatusUpdater( fakeKubeClient, nodeInformer, asw) diff --git a/test/integration/replicaset/replicaset_test.go b/test/integration/replicaset/replicaset_test.go index a9a72317a84..780d15f47fc 100644 --- a/test/integration/replicaset/replicaset_test.go +++ b/test/integration/replicaset/replicaset_test.go @@ -141,7 +141,7 @@ func rmSetup(t *testing.T, enableGarbageCollector bool) (*httptest.Server, *repl resyncPeriodFunc := func() time.Duration { return resyncPeriod } - podInformer := informers.CreateSharedPodIndexInformer(internalclientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "pod-informer")), resyncPeriod) + podInformer := informers.NewPodInformer(internalclientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "pod-informer")), resyncPeriod) rm := replicaset.NewReplicaSetController( podInformer, internalclientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "replicaset-controller")), diff --git a/test/integration/replicationcontroller/replicationcontroller_test.go b/test/integration/replicationcontroller/replicationcontroller_test.go index 1b23a8940d4..1aaed555e01 100644 --- a/test/integration/replicationcontroller/replicationcontroller_test.go +++ b/test/integration/replicationcontroller/replicationcontroller_test.go @@ -138,7 +138,7 @@ func rmSetup(t *testing.T, enableGarbageCollector bool) (*httptest.Server, *repl resyncPeriodFunc := func() time.Duration { return resyncPeriod } - podInformer := informers.CreateSharedPodIndexInformer(internalclientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "pod-informer")), resyncPeriod) + podInformer := informers.NewPodInformer(internalclientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "pod-informer")), resyncPeriod) rm := replication.NewReplicationManager( podInformer, internalclientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "replication-controller")),