mirror of
https://github.com/kubernetes/client-go.git
synced 2025-07-05 11:16:23 +00:00
client-go: support Shutdown()
for metadata and dynamic informers (#114434)
* client-go: support `Shutdown()` for metadata and dynamic informers Followup to https://github.com/kubernetes/kubernetes/pull/112200, specifically https://github.com/kubernetes/kubernetes/pull/112200#issuecomment-1344812038. * add comments * Defer lock Kubernetes-commit: b99fe0d5b9896dd3fe9a2c1bc3b399a18ad080d2
This commit is contained in:
parent
9ea785f819
commit
089d04441d
@ -61,6 +61,12 @@ type dynamicSharedInformerFactory struct {
|
||||
// This allows Start() to be called multiple times safely.
|
||||
startedInformers map[schema.GroupVersionResource]bool
|
||||
tweakListOptions TweakListOptionsFunc
|
||||
|
||||
// wg tracks how many goroutines were started.
|
||||
wg sync.WaitGroup
|
||||
// shuttingDown is true when Shutdown has been called. It may still be running
|
||||
// because it needs to wait for goroutines.
|
||||
shuttingDown bool
|
||||
}
|
||||
|
||||
var _ DynamicSharedInformerFactory = &dynamicSharedInformerFactory{}
|
||||
@ -86,9 +92,21 @@ func (f *dynamicSharedInformerFactory) Start(stopCh <-chan struct{}) {
|
||||
f.lock.Lock()
|
||||
defer f.lock.Unlock()
|
||||
|
||||
if f.shuttingDown {
|
||||
return
|
||||
}
|
||||
|
||||
for informerType, informer := range f.informers {
|
||||
if !f.startedInformers[informerType] {
|
||||
go informer.Informer().Run(stopCh)
|
||||
f.wg.Add(1)
|
||||
// We need a new variable in each loop iteration,
|
||||
// otherwise the goroutine would use the loop variable
|
||||
// and that keeps changing.
|
||||
informer := informer.Informer()
|
||||
go func() {
|
||||
defer f.wg.Done()
|
||||
informer.Run(stopCh)
|
||||
}()
|
||||
f.startedInformers[informerType] = true
|
||||
}
|
||||
}
|
||||
@ -116,6 +134,15 @@ func (f *dynamicSharedInformerFactory) WaitForCacheSync(stopCh <-chan struct{})
|
||||
return res
|
||||
}
|
||||
|
||||
func (f *dynamicSharedInformerFactory) Shutdown() {
|
||||
// Will return immediately if there is nothing to wait for.
|
||||
defer f.wg.Wait()
|
||||
|
||||
f.lock.Lock()
|
||||
defer f.lock.Unlock()
|
||||
f.shuttingDown = true
|
||||
}
|
||||
|
||||
// NewFilteredDynamicInformer constructs a new informer for a dynamic type.
|
||||
func NewFilteredDynamicInformer(client dynamic.Interface, gvr schema.GroupVersionResource, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions TweakListOptionsFunc) informers.GenericInformer {
|
||||
return &dynamicInformer{
|
||||
|
@ -24,9 +24,28 @@ import (
|
||||
|
||||
// DynamicSharedInformerFactory provides access to a shared informer and lister for dynamic client
|
||||
type DynamicSharedInformerFactory interface {
|
||||
// Start initializes all requested informers. They are handled in goroutines
|
||||
// which run until the stop channel gets closed.
|
||||
Start(stopCh <-chan struct{})
|
||||
|
||||
// ForResource gives generic access to a shared informer of the matching type.
|
||||
ForResource(gvr schema.GroupVersionResource) informers.GenericInformer
|
||||
|
||||
// WaitForCacheSync blocks until all started informers' caches were synced
|
||||
// or the stop channel gets closed.
|
||||
WaitForCacheSync(stopCh <-chan struct{}) map[schema.GroupVersionResource]bool
|
||||
|
||||
// Shutdown marks a factory as shutting down. At that point no new
|
||||
// informers can be started anymore and Start will return without
|
||||
// doing anything.
|
||||
//
|
||||
// In addition, Shutdown blocks until all goroutines have terminated. For that
|
||||
// to happen, the close channel(s) that they were started with must be closed,
|
||||
// either before Shutdown gets called or while it is waiting.
|
||||
//
|
||||
// Shutdown may be called multiple times, even concurrently. All such calls will
|
||||
// block until all goroutines have terminated.
|
||||
Shutdown()
|
||||
}
|
||||
|
||||
// TweakListOptionsFunc defines the signature of a helper function
|
||||
|
@ -60,6 +60,11 @@ type metadataSharedInformerFactory struct {
|
||||
// This allows Start() to be called multiple times safely.
|
||||
startedInformers map[schema.GroupVersionResource]bool
|
||||
tweakListOptions TweakListOptionsFunc
|
||||
// wg tracks how many goroutines were started.
|
||||
wg sync.WaitGroup
|
||||
// shuttingDown is true when Shutdown has been called. It may still be running
|
||||
// because it needs to wait for goroutines.
|
||||
shuttingDown bool
|
||||
}
|
||||
|
||||
var _ SharedInformerFactory = &metadataSharedInformerFactory{}
|
||||
@ -85,9 +90,21 @@ func (f *metadataSharedInformerFactory) Start(stopCh <-chan struct{}) {
|
||||
f.lock.Lock()
|
||||
defer f.lock.Unlock()
|
||||
|
||||
if f.shuttingDown {
|
||||
return
|
||||
}
|
||||
|
||||
for informerType, informer := range f.informers {
|
||||
if !f.startedInformers[informerType] {
|
||||
go informer.Informer().Run(stopCh)
|
||||
f.wg.Add(1)
|
||||
// We need a new variable in each loop iteration,
|
||||
// otherwise the goroutine would use the loop variable
|
||||
// and that keeps changing.
|
||||
informer := informer.Informer()
|
||||
go func() {
|
||||
defer f.wg.Done()
|
||||
informer.Run(stopCh)
|
||||
}()
|
||||
f.startedInformers[informerType] = true
|
||||
}
|
||||
}
|
||||
@ -115,6 +132,15 @@ func (f *metadataSharedInformerFactory) WaitForCacheSync(stopCh <-chan struct{})
|
||||
return res
|
||||
}
|
||||
|
||||
func (f *metadataSharedInformerFactory) Shutdown() {
|
||||
// Will return immediately if there is nothing to wait for.
|
||||
defer f.wg.Wait()
|
||||
|
||||
f.lock.Lock()
|
||||
defer f.lock.Unlock()
|
||||
f.shuttingDown = true
|
||||
}
|
||||
|
||||
// NewFilteredMetadataInformer constructs a new informer for a metadata type.
|
||||
func NewFilteredMetadataInformer(client metadata.Interface, gvr schema.GroupVersionResource, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions TweakListOptionsFunc) informers.GenericInformer {
|
||||
return &metadataInformer{
|
||||
|
@ -24,9 +24,28 @@ import (
|
||||
|
||||
// SharedInformerFactory provides access to a shared informer and lister for dynamic client
|
||||
type SharedInformerFactory interface {
|
||||
// Start initializes all requested informers. They are handled in goroutines
|
||||
// which run until the stop channel gets closed.
|
||||
Start(stopCh <-chan struct{})
|
||||
|
||||
// ForResource gives generic access to a shared informer of the matching type.
|
||||
ForResource(gvr schema.GroupVersionResource) informers.GenericInformer
|
||||
|
||||
// WaitForCacheSync blocks until all started informers' caches were synced
|
||||
// or the stop channel gets closed.
|
||||
WaitForCacheSync(stopCh <-chan struct{}) map[schema.GroupVersionResource]bool
|
||||
|
||||
// Shutdown marks a factory as shutting down. At that point no new
|
||||
// informers can be started anymore and Start will return without
|
||||
// doing anything.
|
||||
//
|
||||
// In addition, Shutdown blocks until all goroutines have terminated. For that
|
||||
// to happen, the close channel(s) that they were started with must be closed,
|
||||
// either before Shutdown gets called or while it is waiting.
|
||||
//
|
||||
// Shutdown may be called multiple times, even concurrently. All such calls will
|
||||
// block until all goroutines have terminated.
|
||||
Shutdown()
|
||||
}
|
||||
|
||||
// TweakListOptionsFunc defines the signature of a helper function
|
||||
|
Loading…
Reference in New Issue
Block a user