diff --git a/pkg/clustercache/controller.go b/pkg/clustercache/controller.go index 26f4f2f9..0035b1fa 100644 --- a/pkg/clustercache/controller.go +++ b/pkg/clustercache/controller.go @@ -13,6 +13,7 @@ import ( "github.com/rancher/wrangler/pkg/summary/informer" "github.com/sirupsen/logrus" "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" schema2 "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/dynamic" @@ -44,17 +45,15 @@ type watcher struct { informer cache.SharedIndexInformer gvk schema2.GroupVersionKind gvr schema2.GroupVersionResource - start bool } type clusterCache struct { sync.RWMutex - ctx context.Context - typed map[schema2.GroupVersionKind]cache.SharedIndexInformer - informerFactory informer.SummarySharedInformerFactory - watchers map[schema2.GroupVersionResource]*watcher - workqueue workqueue.DelayingInterface + ctx context.Context + summaryClient client.Interface + watchers map[schema2.GroupVersionResource]*watcher + workqueue workqueue.DelayingInterface addHandlers cancelCollection removeHandlers cancelCollection @@ -63,11 +62,10 @@ type clusterCache struct { func NewClusterCache(ctx context.Context, dynamicClient dynamic.Interface) ClusterCache { c := &clusterCache{ - ctx: ctx, - typed: map[schema2.GroupVersionKind]cache.SharedIndexInformer{}, - informerFactory: informer.NewSummarySharedInformerFactory(client.NewForDynamicClient(dynamicClient), 2*time.Hour), - watchers: map[schema2.GroupVersionResource]*watcher{}, - workqueue: workqueue.NewNamedDelayingQueue("cluster-cache"), + ctx: ctx, + summaryClient: client.NewForDynamicClient(dynamicClient), + watchers: map[schema2.GroupVersionResource]*watcher{}, + workqueue: workqueue.NewNamedDelayingQueue("cluster-cache"), } go c.start() return c @@ -130,8 +128,8 @@ func (h *clusterCache) OnSchemas(schemas *schema.Collection) error { defer h.Unlock() var ( - toStart = map[schema2.GroupVersionResource]*watcher{} - gvrs = map[schema2.GroupVersionResource]bool{} + gvrs = map[schema2.GroupVersionResource]bool{} + toWait []*watcher ) for _, id := range schemas.IDs() { @@ -144,26 +142,26 @@ func (h *clusterCache) OnSchemas(schemas *schema.Collection) error { gvk := attributes.GVK(schema) gvrs[gvr] = true - w := h.watchers[gvr] - if w != nil { + if h.watchers[gvr] != nil { continue } + summaryInformer := informer.NewFilteredSummaryInformer(h.summaryClient, gvr, metav1.NamespaceAll, 2*time.Hour, + cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, nil) ctx, cancel := context.WithCancel(h.ctx) - w = &watcher{ + w := &watcher{ ctx: ctx, cancel: cancel, gvk: gvk, gvr: gvr, - informer: h.typed[gvk], + informer: summaryInformer.Informer(), } - toStart[gvr] = w + h.watchers[gvr] = w + toWait = append(toWait, w) - if w.informer == nil { - w.informer = h.informerFactory.ForResource(gvr).Informer() - w.start = true - h.addResourceEventHandler(gvr, w.informer) - } + logrus.Infof("Watching metadata for %s", w.gvk) + h.addResourceEventHandler(w.gvr, w.informer) + go w.informer.Run(w.ctx.Done()) } for gvr, w := range h.watchers { @@ -174,24 +172,15 @@ func (h *clusterCache) OnSchemas(schemas *schema.Collection) error { } } - var toWait []*watcher - - for _, w := range toStart { - if !w.start { - continue - } - w.start = false - logrus.Infof("Watching metadata for %s", w.gvk) - go w.informer.Run(w.ctx.Done()) - toWait = append(toWait, w) - } - for _, w := range toWait { - cache.WaitForCacheSync(w.ctx.Done(), w.informer.HasSynced) - } - - for _, w := range toStart { - h.watchers[w.gvr] = w + ctx, cancel := context.WithTimeout(w.ctx, 15*time.Minute) + if !cache.WaitForCacheSync(ctx.Done(), w.informer.HasSynced) { + logrus.Errorf("failed to sync cache for %v", w.gvk) + cancel() + w.cancel() + delete(h.watchers, w.gvr) + } + cancel() } return nil