Merge pull request #32815 from deads2k/controller-02-daemonset-informer

Automatic merge from submit-queue

convert daemonset controller to shared informers

Convert the daemonset controller completely to `SharedInformers` for its list/watch resources.

@kubernetes/rh-cluster-infra @ncdc
This commit is contained in:
Kubernetes Submit Queue 2016-09-16 09:39:57 -07:00 committed by GitHub
commit 2ca15b9f76
7 changed files with 224 additions and 191 deletions

View File

@ -341,7 +341,7 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig
if containsResource(resources, "daemonsets") { if containsResource(resources, "daemonsets") {
glog.Infof("Starting daemon set controller") glog.Infof("Starting daemon set controller")
go daemon.NewDaemonSetsController(sharedInformers.Pods().Informer(), clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "daemon-set-controller")), ResyncPeriod(s), int(s.LookupCacheSizeForDaemonSet)). go daemon.NewDaemonSetsController(sharedInformers.DaemonSets(), sharedInformers.Pods(), sharedInformers.Nodes(), clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "daemon-set-controller")), int(s.LookupCacheSizeForDaemonSet)).
Run(int(s.ConcurrentDaemonSetSyncs), wait.NeverStop) Run(int(s.ConcurrentDaemonSetSyncs), wait.NeverStop)
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
} }

View File

@ -42,6 +42,7 @@ import (
"k8s.io/kubernetes/pkg/controller/daemon" "k8s.io/kubernetes/pkg/controller/daemon"
"k8s.io/kubernetes/pkg/controller/deployment" "k8s.io/kubernetes/pkg/controller/deployment"
endpointcontroller "k8s.io/kubernetes/pkg/controller/endpoint" endpointcontroller "k8s.io/kubernetes/pkg/controller/endpoint"
"k8s.io/kubernetes/pkg/controller/informers"
"k8s.io/kubernetes/pkg/controller/job" "k8s.io/kubernetes/pkg/controller/job"
namespacecontroller "k8s.io/kubernetes/pkg/controller/namespace" namespacecontroller "k8s.io/kubernetes/pkg/controller/namespace"
nodecontroller "k8s.io/kubernetes/pkg/controller/node" nodecontroller "k8s.io/kubernetes/pkg/controller/node"
@ -262,8 +263,11 @@ func (s *CMServer) Run(_ []string) error {
if containsResource(resources, "daemonsets") { if containsResource(resources, "daemonsets") {
glog.Infof("Starting daemon set controller") glog.Infof("Starting daemon set controller")
go daemon.NewDaemonSetsControllerFromClient(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "daemon-set-controller")), s.resyncPeriod, int(s.LookupCacheSizeForDaemonSet)). informerFactory := informers.NewSharedInformerFactory(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "daemon-set-controller")), s.resyncPeriod())
go daemon.NewDaemonSetsController(informerFactory.DaemonSets(), informerFactory.Pods(), informerFactory.Nodes(), clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "daemon-set-controller")), int(s.LookupCacheSizeForDaemonSet)).
Run(int(s.ConcurrentDaemonSetSyncs), wait.NeverStop) Run(int(s.ConcurrentDaemonSetSyncs), wait.NeverStop)
informerFactory.Start(wait.NeverStop)
} }
if containsResource(resources, "jobs") { if containsResource(resources, "jobs") {

View File

@ -34,13 +34,11 @@ import (
"k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/informers" "k8s.io/kubernetes/pkg/controller/informers"
"k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/runtime"
utilerrors "k8s.io/kubernetes/pkg/util/errors" utilerrors "k8s.io/kubernetes/pkg/util/errors"
"k8s.io/kubernetes/pkg/util/metrics" "k8s.io/kubernetes/pkg/util/metrics"
utilruntime "k8s.io/kubernetes/pkg/util/runtime" utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/util/workqueue" "k8s.io/kubernetes/pkg/util/workqueue"
"k8s.io/kubernetes/pkg/watch"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/predicates" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/predicates"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
@ -66,13 +64,6 @@ type DaemonSetsController struct {
eventRecorder record.EventRecorder eventRecorder record.EventRecorder
podControl controller.PodControlInterface podControl controller.PodControlInterface
// internalPodInformer is used to hold a personal informer. If we're using
// a normal shared informer, then the informer will be started for us. If
// we have a personal informer, we must start it ourselves. If you start
// the controller using NewDaemonSetsController(passing SharedInformer), this
// will be null
internalPodInformer cache.SharedInformer
// An dsc is temporarily suspended after creating/deleting these many replicas. // An dsc is temporarily suspended after creating/deleting these many replicas.
// It resumes normal action after observing the watch events for them. // It resumes normal action after observing the watch events for them.
burstReplicas int burstReplicas int
@ -82,17 +73,11 @@ type DaemonSetsController struct {
// A TTLCache of pod creates/deletes each ds expects to see // A TTLCache of pod creates/deletes each ds expects to see
expectations controller.ControllerExpectationsInterface expectations controller.ControllerExpectationsInterface
// A store of daemon sets // A store of daemon sets
dsStore cache.StoreToDaemonSetLister dsStore *cache.StoreToDaemonSetLister
// A store of pods // A store of pods
podStore cache.StoreToPodLister podStore *cache.StoreToPodLister
// A store of nodes // A store of nodes
nodeStore cache.StoreToNodeLister nodeStore *cache.StoreToNodeLister
// Watches changes to all daemon sets.
dsController *cache.Controller
// Watches changes to all pods
podController cache.ControllerInterface
// Watches changes to all nodes.
nodeController *cache.Controller
// podStoreSynced returns true if the pod store has been synced at least once. // podStoreSynced returns true if the pod store has been synced at least once.
// Added as a member to the struct to allow injection for testing. // Added as a member to the struct to allow injection for testing.
podStoreSynced cache.InformerSynced podStoreSynced cache.InformerSynced
@ -106,7 +91,7 @@ type DaemonSetsController struct {
queue workqueue.RateLimitingInterface queue workqueue.RateLimitingInterface
} }
func NewDaemonSetsController(podInformer cache.SharedIndexInformer, kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, lookupCacheSize int) *DaemonSetsController { func NewDaemonSetsController(daemonSetInformer informers.DaemonSetInformer, podInformer informers.PodInformer, nodeInformer informers.NodeInformer, kubeClient clientset.Interface, lookupCacheSize int) *DaemonSetsController {
eventBroadcaster := record.NewBroadcaster() eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof) eventBroadcaster.StartLogging(glog.Infof)
// TODO: remove the wrapper when every clients have moved to use the clientset. // TODO: remove the wrapper when every clients have moved to use the clientset.
@ -126,20 +111,8 @@ func NewDaemonSetsController(podInformer cache.SharedIndexInformer, kubeClient c
expectations: controller.NewControllerExpectations(), expectations: controller.NewControllerExpectations(),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "daemonset"), queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "daemonset"),
} }
// Manage addition/update of daemon sets.
dsc.dsStore.Store, dsc.dsController = cache.NewInformer( daemonSetInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return dsc.kubeClient.Extensions().DaemonSets(api.NamespaceAll).List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return dsc.kubeClient.Extensions().DaemonSets(api.NamespaceAll).Watch(options)
},
},
&extensions.DaemonSet{},
// TODO: Can we have much longer period here?
FullDaemonSetResyncPeriod,
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { AddFunc: func(obj interface{}) {
ds := obj.(*extensions.DaemonSet) ds := obj.(*extensions.DaemonSet)
glog.V(4).Infof("Adding daemon set %s", ds.Name) glog.V(4).Infof("Adding daemon set %s", ds.Name)
@ -167,52 +140,32 @@ func NewDaemonSetsController(podInformer cache.SharedIndexInformer, kubeClient c
dsc.enqueueDaemonSet(curDS) dsc.enqueueDaemonSet(curDS)
}, },
DeleteFunc: dsc.deleteDaemonset, DeleteFunc: dsc.deleteDaemonset,
}, })
) dsc.dsStore = daemonSetInformer.Lister()
// Watch for creation/deletion of pods. The reason we watch is that we don't want a daemon set to create/delete // Watch for creation/deletion of pods. The reason we watch is that we don't want a daemon set to create/delete
// more pods until all the effects (expectations) of a daemon set's create/delete have been observed. // more pods until all the effects (expectations) of a daemon set's create/delete have been observed.
podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: dsc.addPod, AddFunc: dsc.addPod,
UpdateFunc: dsc.updatePod, UpdateFunc: dsc.updatePod,
DeleteFunc: dsc.deletePod, DeleteFunc: dsc.deletePod,
}) })
dsc.podStore.Indexer = podInformer.GetIndexer() dsc.podStore = podInformer.Lister()
dsc.podController = podInformer.GetController() dsc.podStoreSynced = podInformer.Informer().HasSynced
dsc.podStoreSynced = podInformer.HasSynced
// Watch for new nodes or updates to nodes - daemon pods are launched on new nodes, and possibly when labels on nodes change, nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
dsc.nodeStore.Store, dsc.nodeController = cache.NewInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return dsc.kubeClient.Core().Nodes().List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return dsc.kubeClient.Core().Nodes().Watch(options)
},
},
&api.Node{},
resyncPeriod(),
cache.ResourceEventHandlerFuncs{
AddFunc: dsc.addNode, AddFunc: dsc.addNode,
UpdateFunc: dsc.updateNode, UpdateFunc: dsc.updateNode,
}, },
) )
dsc.nodeStoreSynced = dsc.nodeController.HasSynced dsc.nodeStoreSynced = nodeInformer.Informer().HasSynced
dsc.nodeStore = nodeInformer.Lister()
dsc.syncHandler = dsc.syncDaemonSet dsc.syncHandler = dsc.syncDaemonSet
dsc.lookupCache = controller.NewMatchingCache(lookupCacheSize) dsc.lookupCache = controller.NewMatchingCache(lookupCacheSize)
return dsc return dsc
} }
func NewDaemonSetsControllerFromClient(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, lookupCacheSize int) *DaemonSetsController {
podInformer := informers.NewPodInformer(kubeClient, resyncPeriod())
dsc := NewDaemonSetsController(podInformer, kubeClient, resyncPeriod, lookupCacheSize)
dsc.internalPodInformer = podInformer
return dsc
}
func (dsc *DaemonSetsController) deleteDaemonset(obj interface{}) { func (dsc *DaemonSetsController) deleteDaemonset(obj interface{}) {
ds, ok := obj.(*extensions.DaemonSet) ds, ok := obj.(*extensions.DaemonSet)
if !ok { if !ok {
@ -237,9 +190,6 @@ func (dsc *DaemonSetsController) Run(workers int, stopCh <-chan struct{}) {
defer dsc.queue.ShutDown() defer dsc.queue.ShutDown()
glog.Infof("Starting Daemon Sets controller manager") glog.Infof("Starting Daemon Sets controller manager")
go dsc.dsController.Run(stopCh)
go dsc.podController.Run(stopCh)
go dsc.nodeController.Run(stopCh)
if !cache.WaitForCacheSync(stopCh, dsc.podStoreSynced, dsc.nodeStoreSynced) { if !cache.WaitForCacheSync(stopCh, dsc.podStoreSynced, dsc.nodeStoreSynced) {
return return
@ -249,10 +199,6 @@ func (dsc *DaemonSetsController) Run(workers int, stopCh <-chan struct{}) {
go wait.Until(dsc.runWorker, time.Second, stopCh) go wait.Until(dsc.runWorker, time.Second, stopCh)
} }
if dsc.internalPodInformer != nil {
go dsc.internalPodInformer.Run(stopCh)
}
<-stopCh <-stopCh
glog.Infof("Shutting down Daemon Set Controller") glog.Infof("Shutting down Daemon Set Controller")
} }

View File

@ -29,7 +29,9 @@ import (
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/client/restclient" "k8s.io/kubernetes/pkg/client/restclient"
"k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/informers"
"k8s.io/kubernetes/pkg/securitycontext" "k8s.io/kubernetes/pkg/securitycontext"
"k8s.io/kubernetes/pkg/util/wait"
) )
var ( var (
@ -136,7 +138,11 @@ func addPods(podStore cache.Store, nodeName string, label map[string]string, num
func newTestController() (*DaemonSetsController, *controller.FakePodControl) { func newTestController() (*DaemonSetsController, *controller.FakePodControl) {
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
manager := NewDaemonSetsControllerFromClient(clientset, controller.NoResyncPeriodFunc, 0) informerFactory := informers.NewSharedInformerFactory(clientset, controller.NoResyncPeriodFunc())
manager := NewDaemonSetsController(informerFactory.DaemonSets(), informerFactory.Pods(), informerFactory.Nodes(), clientset, 0)
informerFactory.Start(wait.NeverStop)
manager.podStoreSynced = alwaysReady manager.podStoreSynced = alwaysReady
manager.nodeStoreSynced = alwaysReady manager.nodeStoreSynced = alwaysReady
podControl := &controller.FakePodControl{} podControl := &controller.FakePodControl{}

View File

@ -18,9 +18,13 @@ package informers
import ( import (
"reflect" "reflect"
"time"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/cache" "k8s.io/kubernetes/pkg/client/cache"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/watch"
) )
// PodInformer is type of SharedIndexInformer which watches and lists all pods. // PodInformer is type of SharedIndexInformer which watches and lists all pods.
@ -200,3 +204,94 @@ func (f *pvInformer) Lister() *cache.StoreToPVFetcher {
informer := f.Informer() informer := f.Informer()
return &cache.StoreToPVFetcher{Store: informer.GetStore()} return &cache.StoreToPVFetcher{Store: informer.GetStore()}
} }
// NewPodInformer returns a SharedIndexInformer that lists and watches all pods
func NewPodInformer(client clientset.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
sharedIndexInformer := cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return client.Core().Pods(api.NamespaceAll).List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return client.Core().Pods(api.NamespaceAll).Watch(options)
},
},
&api.Pod{},
resyncPeriod,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
return sharedIndexInformer
}
// NewNodeInformer returns a SharedIndexInformer that lists and watches all nodes
func NewNodeInformer(client clientset.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
sharedIndexInformer := cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return client.Core().Nodes().List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return client.Core().Nodes().Watch(options)
},
},
&api.Node{},
resyncPeriod,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
return sharedIndexInformer
}
// NewPVCInformer returns a SharedIndexInformer that lists and watches all PVCs
func NewPVCInformer(client clientset.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
sharedIndexInformer := cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return client.Core().PersistentVolumeClaims(api.NamespaceAll).List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return client.Core().PersistentVolumeClaims(api.NamespaceAll).Watch(options)
},
},
&api.PersistentVolumeClaim{},
resyncPeriod,
cache.Indexers{})
return sharedIndexInformer
}
// NewPVInformer returns a SharedIndexInformer that lists and watches all PVs
func NewPVInformer(client clientset.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
sharedIndexInformer := cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return client.Core().PersistentVolumes().List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return client.Core().PersistentVolumes().Watch(options)
},
},
&api.PersistentVolume{},
resyncPeriod,
cache.Indexers{})
return sharedIndexInformer
}
// NewNamespaceInformer returns a SharedIndexInformer that lists and watches namespaces
func NewNamespaceInformer(client clientset.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
sharedIndexInformer := cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return client.Core().Namespaces().List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return client.Core().Namespaces().Watch(options)
},
},
&api.Namespace{},
resyncPeriod,
cache.Indexers{})
return sharedIndexInformer
}

View File

@ -0,0 +1,70 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package informers
import (
"reflect"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/apis/extensions"
"k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/watch"
)
// DaemonSetInformer is type of SharedIndexInformer which watches and lists all pods.
// Interface provides constructor for informer and lister for pods
type DaemonSetInformer interface {
Informer() cache.SharedIndexInformer
Lister() *cache.StoreToDaemonSetLister
}
type daemonSetInformer struct {
*sharedInformerFactory
}
func (f *daemonSetInformer) Informer() cache.SharedIndexInformer {
f.lock.Lock()
defer f.lock.Unlock()
informerType := reflect.TypeOf(&extensions.DaemonSet{})
informer, exists := f.informers[informerType]
if exists {
return informer
}
informer = cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return f.client.Extensions().DaemonSets(api.NamespaceAll).List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return f.client.Extensions().DaemonSets(api.NamespaceAll).Watch(options)
},
},
&extensions.DaemonSet{},
f.defaultResync,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
f.informers[informerType] = informer
return informer
}
func (f *daemonSetInformer) Lister() *cache.StoreToDaemonSetLister {
informer := f.Informer()
return &cache.StoreToDaemonSetLister{Store: informer.GetIndexer()}
}

View File

@ -21,11 +21,8 @@ import (
"sync" "sync"
"time" "time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/cache" "k8s.io/kubernetes/pkg/client/cache"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/watch"
) )
// SharedInformerFactory provides interface which holds unique informers for pods, nodes, namespaces, persistent volume // SharedInformerFactory provides interface which holds unique informers for pods, nodes, namespaces, persistent volume
@ -39,6 +36,8 @@ type SharedInformerFactory interface {
Namespaces() NamespaceInformer Namespaces() NamespaceInformer
PersistentVolumeClaims() PVCInformer PersistentVolumeClaims() PVCInformer
PersistentVolumes() PVInformer PersistentVolumes() PVInformer
DaemonSets() DaemonSetInformer
} }
type sharedInformerFactory struct { type sharedInformerFactory struct {
@ -100,93 +99,6 @@ func (f *sharedInformerFactory) PersistentVolumes() PVInformer {
return &pvInformer{sharedInformerFactory: f} return &pvInformer{sharedInformerFactory: f}
} }
// NewPodInformer returns a SharedIndexInformer that lists and watches all pods func (f *sharedInformerFactory) DaemonSets() DaemonSetInformer {
func NewPodInformer(client clientset.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer { return &daemonSetInformer{sharedInformerFactory: f}
sharedIndexInformer := cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return client.Core().Pods(api.NamespaceAll).List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return client.Core().Pods(api.NamespaceAll).Watch(options)
},
},
&api.Pod{},
resyncPeriod,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
return sharedIndexInformer
}
// NewNodeInformer returns a SharedIndexInformer that lists and watches all nodes
func NewNodeInformer(client clientset.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
sharedIndexInformer := cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return client.Core().Nodes().List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return client.Core().Nodes().Watch(options)
},
},
&api.Node{},
resyncPeriod,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
return sharedIndexInformer
}
// NewPVCInformer returns a SharedIndexInformer that lists and watches all PVCs
func NewPVCInformer(client clientset.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
sharedIndexInformer := cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return client.Core().PersistentVolumeClaims(api.NamespaceAll).List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return client.Core().PersistentVolumeClaims(api.NamespaceAll).Watch(options)
},
},
&api.PersistentVolumeClaim{},
resyncPeriod,
cache.Indexers{})
return sharedIndexInformer
}
// NewPVInformer returns a SharedIndexInformer that lists and watches all PVs
func NewPVInformer(client clientset.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
sharedIndexInformer := cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return client.Core().PersistentVolumes().List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return client.Core().PersistentVolumes().Watch(options)
},
},
&api.PersistentVolume{},
resyncPeriod,
cache.Indexers{})
return sharedIndexInformer
}
// NewNamespaceInformer returns a SharedIndexInformer that lists and watches namespaces
func NewNamespaceInformer(client clientset.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
sharedIndexInformer := cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return client.Core().Namespaces().List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return client.Core().Namespaces().Watch(options)
},
},
&api.Namespace{},
resyncPeriod,
cache.Indexers{})
return sharedIndexInformer
} }