Merge pull request #29978 from hodovska/sharedInformer-fixup

Automatic merge from submit-queue

SharedInformerFactory: usage and fixes

Follow-up for #26709
This commit is contained in:
Kubernetes Submit Queue 2016-08-04 09:00:23 -07:00 committed by GitHub
commit 42a12a4cd6
14 changed files with 64 additions and 158 deletions

View File

@ -30,7 +30,6 @@ import (
"net/http" "net/http"
"net/http/pprof" "net/http/pprof"
"os" "os"
"reflect"
"strconv" "strconv"
"time" "time"
@ -50,7 +49,6 @@ import (
"k8s.io/kubernetes/pkg/controller/daemon" "k8s.io/kubernetes/pkg/controller/daemon"
"k8s.io/kubernetes/pkg/controller/deployment" "k8s.io/kubernetes/pkg/controller/deployment"
endpointcontroller "k8s.io/kubernetes/pkg/controller/endpoint" endpointcontroller "k8s.io/kubernetes/pkg/controller/endpoint"
"k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/controller/framework/informers" "k8s.io/kubernetes/pkg/controller/framework/informers"
"k8s.io/kubernetes/pkg/controller/garbagecollector" "k8s.io/kubernetes/pkg/controller/garbagecollector"
"k8s.io/kubernetes/pkg/controller/job" "k8s.io/kubernetes/pkg/controller/job"
@ -197,22 +195,14 @@ func Run(s *options.CMServer) error {
} }
func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig *restclient.Config, stop <-chan struct{}) error { func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig *restclient.Config, stop <-chan struct{}) error {
podInformer := informers.CreateSharedPodIndexInformer(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "pod-informer")), ResyncPeriod(s)()) sharedInformers := informers.NewSharedInformerFactory(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "shared-informers")), ResyncPeriod(s)())
nodeInformer := informers.CreateSharedNodeIndexInformer(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "node-informer")), ResyncPeriod(s)())
pvcInformer := informers.CreateSharedPVCIndexInformer(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "pvc-informer")), ResyncPeriod(s)())
pvInformer := informers.CreateSharedPVIndexInformer(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "pv-informer")), ResyncPeriod(s)())
informers := map[reflect.Type]framework.SharedIndexInformer{}
informers[reflect.TypeOf(&api.Pod{})] = podInformer
informers[reflect.TypeOf(&api.Node{})] = nodeInformer
informers[reflect.TypeOf(&api.PersistentVolumeClaim{})] = pvcInformer
informers[reflect.TypeOf(&api.PersistentVolume{})] = pvInformer
go endpointcontroller.NewEndpointController(podInformer, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "endpoint-controller"))). go endpointcontroller.NewEndpointController(sharedInformers.Pods().Informer(), clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "endpoint-controller"))).
Run(int(s.ConcurrentEndpointSyncs), wait.NeverStop) Run(int(s.ConcurrentEndpointSyncs), wait.NeverStop)
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
go replicationcontroller.NewReplicationManager( go replicationcontroller.NewReplicationManager(
podInformer, sharedInformers.Pods().Informer(),
clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "replication-controller")), clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "replication-controller")),
ResyncPeriod(s), ResyncPeriod(s),
replicationcontroller.BurstReplicas, replicationcontroller.BurstReplicas,
@ -240,7 +230,7 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig
if err != nil { if err != nil {
glog.Warningf("Unsuccessful parsing of service CIDR %v: %v", s.ServiceCIDR, err) glog.Warningf("Unsuccessful parsing of service CIDR %v: %v", s.ServiceCIDR, err)
} }
nodeController, err := nodecontroller.NewNodeController(podInformer, cloud, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "node-controller")), nodeController, err := nodecontroller.NewNodeController(sharedInformers.Pods().Informer(), cloud, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "node-controller")),
s.PodEvictionTimeout.Duration, s.DeletingPodsQps, s.NodeMonitorGracePeriod.Duration, s.PodEvictionTimeout.Duration, s.DeletingPodsQps, s.NodeMonitorGracePeriod.Duration,
s.NodeStartupGracePeriod.Duration, s.NodeMonitorPeriod.Duration, clusterCIDR, serviceCIDR, s.NodeStartupGracePeriod.Duration, s.NodeMonitorPeriod.Duration, clusterCIDR, serviceCIDR,
int(s.NodeCIDRMaskSize), s.AllocateNodeCIDRs) int(s.NodeCIDRMaskSize), s.AllocateNodeCIDRs)
@ -284,7 +274,7 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig
KubeClient: resourceQuotaControllerClient, KubeClient: resourceQuotaControllerClient,
ResyncPeriod: controller.StaticResyncPeriodFunc(s.ResourceQuotaSyncPeriod.Duration), ResyncPeriod: controller.StaticResyncPeriodFunc(s.ResourceQuotaSyncPeriod.Duration),
Registry: resourceQuotaRegistry, Registry: resourceQuotaRegistry,
ControllerFactory: resourcequotacontroller.NewReplenishmentControllerFactory(podInformer, resourceQuotaControllerClient), ControllerFactory: resourcequotacontroller.NewReplenishmentControllerFactory(sharedInformers.Pods().Informer(), resourceQuotaControllerClient),
ReplenishmentResyncPeriod: ResyncPeriod(s), ReplenishmentResyncPeriod: ResyncPeriod(s),
GroupKindsToReplenish: groupKindsToReplenish, GroupKindsToReplenish: groupKindsToReplenish,
} }
@ -344,14 +334,14 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig
if containsResource(resources, "daemonsets") { if containsResource(resources, "daemonsets") {
glog.Infof("Starting daemon set controller") glog.Infof("Starting daemon set controller")
go daemon.NewDaemonSetsController(podInformer, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "daemon-set-controller")), ResyncPeriod(s), int(s.LookupCacheSizeForDaemonSet)). go daemon.NewDaemonSetsController(sharedInformers.Pods().Informer(), clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "daemon-set-controller")), ResyncPeriod(s), int(s.LookupCacheSizeForDaemonSet)).
Run(int(s.ConcurrentDaemonSetSyncs), wait.NeverStop) Run(int(s.ConcurrentDaemonSetSyncs), wait.NeverStop)
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
} }
if containsResource(resources, "jobs") { if containsResource(resources, "jobs") {
glog.Infof("Starting job controller") glog.Infof("Starting job controller")
go job.NewJobController(podInformer, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "job-controller"))). go job.NewJobController(sharedInformers.Pods().Informer(), clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "job-controller"))).
Run(int(s.ConcurrentJobSyncs), wait.NeverStop) Run(int(s.ConcurrentJobSyncs), wait.NeverStop)
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
} }
@ -365,7 +355,7 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig
if containsResource(resources, "replicasets") { if containsResource(resources, "replicasets") {
glog.Infof("Starting ReplicaSet controller") glog.Infof("Starting ReplicaSet controller")
go replicaset.NewReplicaSetController(podInformer, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "replicaset-controller")), ResyncPeriod(s), replicaset.BurstReplicas, int(s.LookupCacheSizeForRS), s.EnableGarbageCollector). go replicaset.NewReplicaSetController(sharedInformers.Pods().Informer(), clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "replicaset-controller")), ResyncPeriod(s), replicaset.BurstReplicas, int(s.LookupCacheSizeForRS), s.EnableGarbageCollector).
Run(int(s.ConcurrentRSSyncs), wait.NeverStop) Run(int(s.ConcurrentRSSyncs), wait.NeverStop)
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
} }
@ -380,7 +370,7 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig
glog.Infof("Starting PetSet controller") glog.Infof("Starting PetSet controller")
resyncPeriod := ResyncPeriod(s)() resyncPeriod := ResyncPeriod(s)()
go petset.NewPetSetController( go petset.NewPetSetController(
podInformer, sharedInformers.Pods().Informer(),
// TODO: Switch to using clientset // TODO: Switch to using clientset
kubeClient, kubeClient,
resyncPeriod, resyncPeriod,
@ -410,10 +400,10 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig
attachDetachController, attachDetachControllerErr := attachDetachController, attachDetachControllerErr :=
attachdetach.NewAttachDetachController( attachdetach.NewAttachDetachController(
clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "attachdetach-controller")), clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "attachdetach-controller")),
podInformer, sharedInformers.Pods().Informer(),
nodeInformer, sharedInformers.Nodes().Informer(),
pvcInformer, sharedInformers.PersistentVolumeClaims().Informer(),
pvInformer, sharedInformers.PersistentVolumes().Informer(),
cloud, cloud,
ProbeAttachableVolumePlugins(s.VolumeConfiguration)) ProbeAttachableVolumePlugins(s.VolumeConfiguration))
if attachDetachControllerErr != nil { if attachDetachControllerErr != nil {
@ -497,10 +487,7 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig
} }
} }
// run the shared informers sharedInformers.Start(stop)
for _, informer := range informers {
go informer.Run(wait.NeverStop)
}
select {} select {}
} }

View File

@ -205,7 +205,7 @@ func NewDaemonSetsController(podInformer framework.SharedIndexInformer, kubeClie
} }
func NewDaemonSetsControllerFromClient(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, lookupCacheSize int) *DaemonSetsController { func NewDaemonSetsControllerFromClient(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, lookupCacheSize int) *DaemonSetsController {
podInformer := informers.CreateSharedPodIndexInformer(kubeClient, resyncPeriod()) podInformer := informers.NewPodInformer(kubeClient, resyncPeriod())
dsc := NewDaemonSetsController(podInformer, kubeClient, resyncPeriod, lookupCacheSize) dsc := NewDaemonSetsController(podInformer, kubeClient, resyncPeriod, lookupCacheSize)
dsc.internalPodInformer = podInformer dsc.internalPodInformer = podInformer

View File

@ -113,7 +113,7 @@ func NewEndpointController(podInformer framework.SharedIndexInformer, client *cl
// NewEndpointControllerFromClient returns a new *EndpointController that runs its own informer. // NewEndpointControllerFromClient returns a new *EndpointController that runs its own informer.
func NewEndpointControllerFromClient(client *clientset.Clientset, resyncPeriod controller.ResyncPeriodFunc) *EndpointController { func NewEndpointControllerFromClient(client *clientset.Clientset, resyncPeriod controller.ResyncPeriodFunc) *EndpointController {
podInformer := informers.CreateSharedPodIndexInformer(client, resyncPeriod()) podInformer := informers.NewPodInformer(client, resyncPeriod())
e := NewEndpointController(podInformer, client) e := NewEndpointController(podInformer, client)
e.internalPodInformer = podInformer e.internalPodInformer = podInformer

View File

@ -22,8 +22,6 @@ import (
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/cache" "k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/controller/framework" "k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/watch"
) )
// PodInformer is type of SharedIndexInformer which watches and lists all pods. // PodInformer is type of SharedIndexInformer which watches and lists all pods.
@ -43,26 +41,12 @@ func (f *podInformer) Informer() framework.SharedIndexInformer {
f.lock.Lock() f.lock.Lock()
defer f.lock.Unlock() defer f.lock.Unlock()
informerObj := &api.Pod{} informerType := reflect.TypeOf(&api.Pod{})
informerType := reflect.TypeOf(informerObj)
informer, exists := f.informers[informerType] informer, exists := f.informers[informerType]
if exists { if exists {
return informer return informer
} }
informer = NewPodInformer(f.client, f.defaultResync)
informer = framework.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return f.client.Core().Pods(api.NamespaceAll).List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return f.client.Core().Pods(api.NamespaceAll).Watch(options)
},
},
informerObj,
f.defaultResync,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
f.informers[informerType] = informer f.informers[informerType] = informer
return informer return informer
@ -92,25 +76,13 @@ type namespaceInformer struct {
func (f *namespaceInformer) Informer() framework.SharedIndexInformer { func (f *namespaceInformer) Informer() framework.SharedIndexInformer {
f.lock.Lock() f.lock.Lock()
defer f.lock.Unlock() defer f.lock.Unlock()
informerObj := &api.Namespace{}
informerType := reflect.TypeOf(informerObj) informerType := reflect.TypeOf(&api.Namespace{})
informer, exists := f.informers[informerType] informer, exists := f.informers[informerType]
if exists { if exists {
return informer return informer
} }
informer = framework.NewSharedIndexInformer( informer = NewNamespaceInformer(f.client, f.defaultResync)
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return f.client.Core().Namespaces().List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return f.client.Core().Namespaces().Watch(options)
},
},
informerObj,
f.defaultResync,
cache.Indexers{},
)
f.informers[informerType] = informer f.informers[informerType] = informer
return informer return informer
@ -141,26 +113,12 @@ func (f *nodeInformer) Informer() framework.SharedIndexInformer {
f.lock.Lock() f.lock.Lock()
defer f.lock.Unlock() defer f.lock.Unlock()
informerObj := &api.Node{} informerType := reflect.TypeOf(&api.Node{})
informerType := reflect.TypeOf(informerObj)
informer, exists := f.informers[informerType] informer, exists := f.informers[informerType]
if exists { if exists {
return informer return informer
} }
informer = NewNodeInformer(f.client, f.defaultResync)
informer = framework.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return f.client.Core().Nodes().List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return f.client.Core().Nodes().Watch(options)
},
},
informerObj,
f.defaultResync,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
f.informers[informerType] = informer f.informers[informerType] = informer
return informer return informer
@ -191,26 +149,12 @@ func (f *pvcInformer) Informer() framework.SharedIndexInformer {
f.lock.Lock() f.lock.Lock()
defer f.lock.Unlock() defer f.lock.Unlock()
informerObj := &api.PersistentVolumeClaim{} informerType := reflect.TypeOf(&api.PersistentVolumeClaim{})
informerType := reflect.TypeOf(informerObj)
informer, exists := f.informers[informerType] informer, exists := f.informers[informerType]
if exists { if exists {
return informer return informer
} }
informer = NewPVCInformer(f.client, f.defaultResync)
informer = framework.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return f.client.Core().PersistentVolumeClaims(api.NamespaceAll).List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return f.client.Core().PersistentVolumeClaims(api.NamespaceAll).Watch(options)
},
},
informerObj,
f.defaultResync,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
f.informers[informerType] = informer f.informers[informerType] = informer
return informer return informer
@ -241,26 +185,12 @@ func (f *pvInformer) Informer() framework.SharedIndexInformer {
f.lock.Lock() f.lock.Lock()
defer f.lock.Unlock() defer f.lock.Unlock()
informerObj := &api.PersistentVolume{} informerType := reflect.TypeOf(&api.PersistentVolume{})
informerType := reflect.TypeOf(informerObj)
informer, exists := f.informers[informerType] informer, exists := f.informers[informerType]
if exists { if exists {
return informer return informer
} }
informer = NewPVInformer(f.client, f.defaultResync)
informer = framework.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return f.client.Core().PersistentVolumes().List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return f.client.Core().PersistentVolumes().Watch(options)
},
},
informerObj,
f.defaultResync,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
f.informers[informerType] = informer f.informers[informerType] = informer
return informer return informer

View File

@ -46,7 +46,11 @@ type sharedInformerFactory struct {
client clientset.Interface client clientset.Interface
lock sync.Mutex lock sync.Mutex
defaultResync time.Duration defaultResync time.Duration
informers map[reflect.Type]framework.SharedIndexInformer informers map[reflect.Type]framework.SharedIndexInformer
// startedInformers is used for tracking which informers have been started
// this allows calling of Start method multiple times
startedInformers map[reflect.Type]bool
} }
// NewSharedInformerFactory constructs a new instance of sharedInformerFactory // NewSharedInformerFactory constructs a new instance of sharedInformerFactory
@ -55,6 +59,7 @@ func NewSharedInformerFactory(client clientset.Interface, defaultResync time.Dur
client: client, client: client,
defaultResync: defaultResync, defaultResync: defaultResync,
informers: make(map[reflect.Type]framework.SharedIndexInformer), informers: make(map[reflect.Type]framework.SharedIndexInformer),
startedInformers: make(map[reflect.Type]bool),
} }
} }
@ -63,8 +68,11 @@ func (s *sharedInformerFactory) Start(stopCh <-chan struct{}) {
s.lock.Lock() s.lock.Lock()
defer s.lock.Unlock() defer s.lock.Unlock()
for _, informer := range s.informers { for informerType, informer := range s.informers {
if !s.startedInformers[informerType] {
go informer.Run(stopCh) go informer.Run(stopCh)
s.startedInformers[informerType] = true
}
} }
} }
@ -93,27 +101,8 @@ func (f *sharedInformerFactory) PersistentVolumes() PVInformer {
return &pvInformer{sharedInformerFactory: f} return &pvInformer{sharedInformerFactory: f}
} }
// CreateSharedPodInformer returns a SharedIndexInformer that lists and watches all pods // NewPodInformer returns a SharedIndexInformer that lists and watches all pods
func CreateSharedPodInformer(client clientset.Interface, resyncPeriod time.Duration) framework.SharedIndexInformer { func NewPodInformer(client clientset.Interface, resyncPeriod time.Duration) framework.SharedIndexInformer {
sharedInformer := framework.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return client.Core().Pods(api.NamespaceAll).List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return client.Core().Pods(api.NamespaceAll).Watch(options)
},
},
&api.Pod{},
resyncPeriod,
cache.Indexers{},
)
return sharedInformer
}
// CreateSharedPodIndexInformer returns a SharedIndexInformer that lists and watches all pods
func CreateSharedPodIndexInformer(client clientset.Interface, resyncPeriod time.Duration) framework.SharedIndexInformer {
sharedIndexInformer := framework.NewSharedIndexInformer( sharedIndexInformer := framework.NewSharedIndexInformer(
&cache.ListWatch{ &cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) { ListFunc: func(options api.ListOptions) (runtime.Object, error) {
@ -131,8 +120,8 @@ func CreateSharedPodIndexInformer(client clientset.Interface, resyncPeriod time.
return sharedIndexInformer return sharedIndexInformer
} }
// CreateSharedNodeIndexInformer returns a SharedIndexInformer that lists and watches all nodes // NewNodeInformer returns a SharedIndexInformer that lists and watches all nodes
func CreateSharedNodeIndexInformer(client clientset.Interface, resyncPeriod time.Duration) framework.SharedIndexInformer { func NewNodeInformer(client clientset.Interface, resyncPeriod time.Duration) framework.SharedIndexInformer {
sharedIndexInformer := framework.NewSharedIndexInformer( sharedIndexInformer := framework.NewSharedIndexInformer(
&cache.ListWatch{ &cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) { ListFunc: func(options api.ListOptions) (runtime.Object, error) {
@ -149,8 +138,8 @@ func CreateSharedNodeIndexInformer(client clientset.Interface, resyncPeriod time
return sharedIndexInformer return sharedIndexInformer
} }
// CreateSharedPVCIndexInformer returns a SharedIndexInformer that lists and watches all PVCs // NewPVCInformer returns a SharedIndexInformer that lists and watches all PVCs
func CreateSharedPVCIndexInformer(client clientset.Interface, resyncPeriod time.Duration) framework.SharedIndexInformer { func NewPVCInformer(client clientset.Interface, resyncPeriod time.Duration) framework.SharedIndexInformer {
sharedIndexInformer := framework.NewSharedIndexInformer( sharedIndexInformer := framework.NewSharedIndexInformer(
&cache.ListWatch{ &cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) { ListFunc: func(options api.ListOptions) (runtime.Object, error) {
@ -167,8 +156,8 @@ func CreateSharedPVCIndexInformer(client clientset.Interface, resyncPeriod time.
return sharedIndexInformer return sharedIndexInformer
} }
// CreateSharedPVIndexInformer returns a SharedIndexInformer that lists and watches all PVs // NewPVInformer returns a SharedIndexInformer that lists and watches all PVs
func CreateSharedPVIndexInformer(client clientset.Interface, resyncPeriod time.Duration) framework.SharedIndexInformer { func NewPVInformer(client clientset.Interface, resyncPeriod time.Duration) framework.SharedIndexInformer {
sharedIndexInformer := framework.NewSharedIndexInformer( sharedIndexInformer := framework.NewSharedIndexInformer(
&cache.ListWatch{ &cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) { ListFunc: func(options api.ListOptions) (runtime.Object, error) {
@ -185,8 +174,8 @@ func CreateSharedPVIndexInformer(client clientset.Interface, resyncPeriod time.D
return sharedIndexInformer return sharedIndexInformer
} }
// CreateSharedNamespaceIndexInformer returns a SharedIndexInformer that lists and watches namespaces // NewNamespaceInformer returns a SharedIndexInformer that lists and watches namespaces
func CreateSharedNamespaceIndexInformer(client clientset.Interface, resyncPeriod time.Duration) framework.SharedIndexInformer { func NewNamespaceInformer(client clientset.Interface, resyncPeriod time.Duration) framework.SharedIndexInformer {
sharedIndexInformer := framework.NewSharedIndexInformer( sharedIndexInformer := framework.NewSharedIndexInformer(
&cache.ListWatch{ &cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) { ListFunc: func(options api.ListOptions) (runtime.Object, error) {

View File

@ -135,7 +135,7 @@ func NewJobController(podInformer framework.SharedIndexInformer, kubeClient clie
} }
func NewJobControllerFromClient(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc) *JobController { func NewJobControllerFromClient(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc) *JobController {
podInformer := informers.CreateSharedPodIndexInformer(kubeClient, resyncPeriod()) podInformer := informers.NewPodInformer(kubeClient, resyncPeriod())
jm := NewJobController(podInformer, kubeClient) jm := NewJobController(podInformer, kubeClient)
jm.internalPodInformer = podInformer jm.internalPodInformer = podInformer

View File

@ -343,7 +343,7 @@ func NewNodeControllerFromClient(
serviceCIDR *net.IPNet, serviceCIDR *net.IPNet,
nodeCIDRMaskSize int, nodeCIDRMaskSize int,
allocateNodeCIDRs bool) (*NodeController, error) { allocateNodeCIDRs bool) (*NodeController, error) {
podInformer := informers.CreateSharedPodIndexInformer(kubeClient, controller.NoResyncPeriodFunc()) podInformer := informers.NewPodInformer(kubeClient, controller.NoResyncPeriodFunc())
nc, err := NewNodeController(podInformer, cloud, kubeClient, podEvictionTimeout, evictionLimiterQPS, nodeMonitorGracePeriod, nc, err := NewNodeController(podInformer, cloud, kubeClient, podEvictionTimeout, evictionLimiterQPS, nodeMonitorGracePeriod,
nodeStartupGracePeriod, nodeMonitorPeriod, clusterCIDR, serviceCIDR, nodeCIDRMaskSize, allocateNodeCIDRs) nodeStartupGracePeriod, nodeMonitorPeriod, clusterCIDR, serviceCIDR, nodeCIDRMaskSize, allocateNodeCIDRs)
if err != nil { if err != nil {

View File

@ -184,7 +184,7 @@ func newReplicaSetController(eventRecorder record.EventRecorder, podInformer fra
// NewReplicationManagerFromClient creates a new ReplicationManager that runs its own informer. // NewReplicationManagerFromClient creates a new ReplicationManager that runs its own informer.
func NewReplicaSetControllerFromClient(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int, lookupCacheSize int) *ReplicaSetController { func NewReplicaSetControllerFromClient(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int, lookupCacheSize int) *ReplicaSetController {
podInformer := informers.CreateSharedPodIndexInformer(kubeClient, resyncPeriod()) podInformer := informers.NewPodInformer(kubeClient, resyncPeriod())
garbageCollectorEnabled := false garbageCollectorEnabled := false
rsc := NewReplicaSetController(podInformer, kubeClient, resyncPeriod, burstReplicas, lookupCacheSize, garbageCollectorEnabled) rsc := NewReplicaSetController(podInformer, kubeClient, resyncPeriod, burstReplicas, lookupCacheSize, garbageCollectorEnabled)
rsc.internalPodInformer = podInformer rsc.internalPodInformer = podInformer

View File

@ -189,7 +189,7 @@ func newReplicationManager(eventRecorder record.EventRecorder, podInformer frame
// NewReplicationManagerFromClientForIntegration creates a new ReplicationManager that runs its own informer. It disables event recording for use in integration tests. // NewReplicationManagerFromClientForIntegration creates a new ReplicationManager that runs its own informer. It disables event recording for use in integration tests.
func NewReplicationManagerFromClientForIntegration(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int, lookupCacheSize int) *ReplicationManager { func NewReplicationManagerFromClientForIntegration(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int, lookupCacheSize int) *ReplicationManager {
podInformer := informers.CreateSharedPodIndexInformer(kubeClient, resyncPeriod()) podInformer := informers.NewPodInformer(kubeClient, resyncPeriod())
garbageCollectorEnabled := false garbageCollectorEnabled := false
rm := newReplicationManager(&record.FakeRecorder{}, podInformer, kubeClient, resyncPeriod, burstReplicas, lookupCacheSize, garbageCollectorEnabled) rm := newReplicationManager(&record.FakeRecorder{}, podInformer, kubeClient, resyncPeriod, burstReplicas, lookupCacheSize, garbageCollectorEnabled)
rm.internalPodInformer = podInformer rm.internalPodInformer = podInformer
@ -198,7 +198,7 @@ func NewReplicationManagerFromClientForIntegration(kubeClient clientset.Interfac
// NewReplicationManagerFromClient creates a new ReplicationManager that runs its own informer. // NewReplicationManagerFromClient creates a new ReplicationManager that runs its own informer.
func NewReplicationManagerFromClient(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int, lookupCacheSize int) *ReplicationManager { func NewReplicationManagerFromClient(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int, lookupCacheSize int) *ReplicationManager {
podInformer := informers.CreateSharedPodIndexInformer(kubeClient, resyncPeriod()) podInformer := informers.NewPodInformer(kubeClient, resyncPeriod())
garbageCollectorEnabled := false garbageCollectorEnabled := false
rm := NewReplicationManager(podInformer, kubeClient, resyncPeriod, burstReplicas, lookupCacheSize, garbageCollectorEnabled) rm := NewReplicationManager(podInformer, kubeClient, resyncPeriod, burstReplicas, lookupCacheSize, garbageCollectorEnabled)
rm.internalPodInformer = podInformer rm.internalPodInformer = podInformer

View File

@ -129,7 +129,7 @@ func (r *replenishmentControllerFactory) NewController(options *ReplenishmentCon
break break
} }
r.podInformer = informers.CreateSharedPodInformer(r.kubeClient, options.ResyncPeriod()) r.podInformer = informers.NewPodInformer(r.kubeClient, options.ResyncPeriod())
result = r.podInformer result = r.podInformer
case api.Kind("Service"): case api.Kind("Service"):

View File

@ -28,10 +28,10 @@ func Test_NewAttachDetachController_Positive(t *testing.T) {
// Arrange // Arrange
fakeKubeClient := controllervolumetesting.CreateTestClient() fakeKubeClient := controllervolumetesting.CreateTestClient()
resyncPeriod := 5 * time.Minute resyncPeriod := 5 * time.Minute
podInformer := informers.CreateSharedPodIndexInformer(fakeKubeClient, resyncPeriod) podInformer := informers.NewPodInformer(fakeKubeClient, resyncPeriod)
nodeInformer := informers.CreateSharedNodeIndexInformer(fakeKubeClient, resyncPeriod) nodeInformer := informers.NewNodeInformer(fakeKubeClient, resyncPeriod)
pvcInformer := informers.CreateSharedPVCIndexInformer(fakeKubeClient, resyncPeriod) pvcInformer := informers.NewPVCInformer(fakeKubeClient, resyncPeriod)
pvInformer := informers.CreateSharedPVIndexInformer(fakeKubeClient, resyncPeriod) pvInformer := informers.NewPVInformer(fakeKubeClient, resyncPeriod)
// Act // Act
_, err := NewAttachDetachController( _, err := NewAttachDetachController(

View File

@ -47,7 +47,7 @@ func Test_Run_Positive_DoNothing(t *testing.T) {
fakeKubeClient := controllervolumetesting.CreateTestClient() fakeKubeClient := controllervolumetesting.CreateTestClient()
ad := operationexecutor.NewOperationExecutor( ad := operationexecutor.NewOperationExecutor(
fakeKubeClient, volumePluginMgr) fakeKubeClient, volumePluginMgr)
nodeInformer := informers.CreateSharedNodeIndexInformer( nodeInformer := informers.NewNodeInformer(
fakeKubeClient, resyncPeriod) fakeKubeClient, resyncPeriod)
nsu := statusupdater.NewNodeStatusUpdater( nsu := statusupdater.NewNodeStatusUpdater(
fakeKubeClient, nodeInformer, asw) fakeKubeClient, nodeInformer, asw)

View File

@ -141,7 +141,7 @@ func rmSetup(t *testing.T, enableGarbageCollector bool) (*httptest.Server, *repl
resyncPeriodFunc := func() time.Duration { resyncPeriodFunc := func() time.Duration {
return resyncPeriod return resyncPeriod
} }
podInformer := informers.CreateSharedPodIndexInformer(internalclientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "pod-informer")), resyncPeriod) podInformer := informers.NewPodInformer(internalclientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "pod-informer")), resyncPeriod)
rm := replicaset.NewReplicaSetController( rm := replicaset.NewReplicaSetController(
podInformer, podInformer,
internalclientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "replicaset-controller")), internalclientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "replicaset-controller")),

View File

@ -138,7 +138,7 @@ func rmSetup(t *testing.T, enableGarbageCollector bool) (*httptest.Server, *repl
resyncPeriodFunc := func() time.Duration { resyncPeriodFunc := func() time.Duration {
return resyncPeriod return resyncPeriod
} }
podInformer := informers.CreateSharedPodIndexInformer(internalclientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "pod-informer")), resyncPeriod) podInformer := informers.NewPodInformer(internalclientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "pod-informer")), resyncPeriod)
rm := replication.NewReplicationManager( rm := replication.NewReplicationManager(
podInformer, podInformer,
internalclientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "replication-controller")), internalclientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "replication-controller")),