Don't reuse summary informer after the informer is stopped

This commit is contained in:
Darren Shepherd 2020-05-22 15:34:43 -07:00
parent 9e4ed62a47
commit a48c49f660

View File

@ -13,6 +13,7 @@ import (
"github.com/rancher/wrangler/pkg/summary/informer"
"github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
schema2 "k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
@ -44,17 +45,15 @@ type watcher struct {
informer cache.SharedIndexInformer
gvk schema2.GroupVersionKind
gvr schema2.GroupVersionResource
start bool
}
type clusterCache struct {
sync.RWMutex
ctx context.Context
typed map[schema2.GroupVersionKind]cache.SharedIndexInformer
informerFactory informer.SummarySharedInformerFactory
watchers map[schema2.GroupVersionResource]*watcher
workqueue workqueue.DelayingInterface
ctx context.Context
summaryClient client.Interface
watchers map[schema2.GroupVersionResource]*watcher
workqueue workqueue.DelayingInterface
addHandlers cancelCollection
removeHandlers cancelCollection
@ -63,11 +62,10 @@ type clusterCache struct {
func NewClusterCache(ctx context.Context, dynamicClient dynamic.Interface) ClusterCache {
c := &clusterCache{
ctx: ctx,
typed: map[schema2.GroupVersionKind]cache.SharedIndexInformer{},
informerFactory: informer.NewSummarySharedInformerFactory(client.NewForDynamicClient(dynamicClient), 2*time.Hour),
watchers: map[schema2.GroupVersionResource]*watcher{},
workqueue: workqueue.NewNamedDelayingQueue("cluster-cache"),
ctx: ctx,
summaryClient: client.NewForDynamicClient(dynamicClient),
watchers: map[schema2.GroupVersionResource]*watcher{},
workqueue: workqueue.NewNamedDelayingQueue("cluster-cache"),
}
go c.start()
return c
@ -130,8 +128,8 @@ func (h *clusterCache) OnSchemas(schemas *schema.Collection) error {
defer h.Unlock()
var (
toStart = map[schema2.GroupVersionResource]*watcher{}
gvrs = map[schema2.GroupVersionResource]bool{}
gvrs = map[schema2.GroupVersionResource]bool{}
toWait []*watcher
)
for _, id := range schemas.IDs() {
@ -144,26 +142,26 @@ func (h *clusterCache) OnSchemas(schemas *schema.Collection) error {
gvk := attributes.GVK(schema)
gvrs[gvr] = true
w := h.watchers[gvr]
if w != nil {
if h.watchers[gvr] != nil {
continue
}
summaryInformer := informer.NewFilteredSummaryInformer(h.summaryClient, gvr, metav1.NamespaceAll, 2*time.Hour,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, nil)
ctx, cancel := context.WithCancel(h.ctx)
w = &watcher{
w := &watcher{
ctx: ctx,
cancel: cancel,
gvk: gvk,
gvr: gvr,
informer: h.typed[gvk],
informer: summaryInformer.Informer(),
}
toStart[gvr] = w
h.watchers[gvr] = w
toWait = append(toWait, w)
if w.informer == nil {
w.informer = h.informerFactory.ForResource(gvr).Informer()
w.start = true
h.addResourceEventHandler(gvr, w.informer)
}
logrus.Infof("Watching metadata for %s", w.gvk)
h.addResourceEventHandler(w.gvr, w.informer)
go w.informer.Run(w.ctx.Done())
}
for gvr, w := range h.watchers {
@ -174,24 +172,15 @@ func (h *clusterCache) OnSchemas(schemas *schema.Collection) error {
}
}
var toWait []*watcher
for _, w := range toStart {
if !w.start {
continue
}
w.start = false
logrus.Infof("Watching metadata for %s", w.gvk)
go w.informer.Run(w.ctx.Done())
toWait = append(toWait, w)
}
for _, w := range toWait {
cache.WaitForCacheSync(w.ctx.Done(), w.informer.HasSynced)
}
for _, w := range toStart {
h.watchers[w.gvr] = w
ctx, cancel := context.WithTimeout(w.ctx, 15*time.Minute)
if !cache.WaitForCacheSync(ctx.Done(), w.informer.HasSynced) {
logrus.Errorf("failed to sync cache for %v", w.gvk)
cancel()
w.cancel()
delete(h.watchers, w.gvr)
}
cancel()
}
return nil