From a44863b331f75c5d95103a6f11337447c5bd9c68 Mon Sep 17 00:00:00 2001 From: Darren Shepherd Date: Fri, 23 Oct 2020 13:57:57 -0700 Subject: [PATCH] Switch clustercache to index by gvk not gvr --- pkg/clustercache/controller.go | 74 +++++++++++++++--------- pkg/podimpersonation/podimpersonation.go | 10 ++-- pkg/resources/counts/counts.go | 24 ++++---- 3 files changed, 64 insertions(+), 44 deletions(-) diff --git a/pkg/clustercache/controller.go b/pkg/clustercache/controller.go index 48fc6dc..8da4904 100644 --- a/pkg/clustercache/controller.go +++ b/pkg/clustercache/controller.go @@ -21,11 +21,12 @@ import ( "k8s.io/client-go/util/workqueue" ) -type Handler func(gvr schema2.GroupVersionResource, key string, obj runtime.Object) error -type ChangeHandler func(gvr schema2.GroupVersionResource, key string, obj, oldObj runtime.Object) error +type Handler func(gvr schema2.GroupVersionKind, key string, obj runtime.Object) error +type ChangeHandler func(gvr schema2.GroupVersionKind, key string, obj, oldObj runtime.Object) error type ClusterCache interface { - List(gvr schema2.GroupVersionResource) []interface{} + Get(gvk schema2.GroupVersionKind, namespace, name string) (interface{}, bool, error) + List(gvk schema2.GroupVersionKind) []interface{} OnAdd(ctx context.Context, handler Handler) OnRemove(ctx context.Context, handler Handler) OnChange(ctx context.Context, handler ChangeHandler) @@ -34,7 +35,7 @@ type ClusterCache interface { type event struct { add bool - gvr schema2.GroupVersionResource + gvk schema2.GroupVersionKind obj runtime.Object oldObj runtime.Object } @@ -52,7 +53,7 @@ type clusterCache struct { ctx context.Context summaryClient client.Interface - watchers map[schema2.GroupVersionResource]*watcher + watchers map[schema2.GroupVersionKind]*watcher workqueue workqueue.DelayingInterface addHandlers cancelCollection @@ -64,7 +65,7 @@ func NewClusterCache(ctx context.Context, dynamicClient dynamic.Interface) Clust c := &clusterCache{ ctx: ctx, summaryClient: client.NewForDynamicClient(dynamicClient), - watchers: map[schema2.GroupVersionResource]*watcher{}, + watchers: map[schema2.GroupVersionKind]*watcher{}, workqueue: workqueue.NewNamedDelayingQueue("cluster-cache"), } go c.start() @@ -90,14 +91,14 @@ func validSchema(schema *types.APISchema) bool { return true } -func (h *clusterCache) addResourceEventHandler(gvr schema2.GroupVersionResource, informer cache.SharedIndexInformer) { +func (h *clusterCache) addResourceEventHandler(gvk schema2.GroupVersionKind, informer cache.SharedIndexInformer) { informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { if rObj, ok := obj.(runtime.Object); ok { h.workqueue.Add(event{ add: true, obj: rObj, - gvr: gvr, + gvk: gvk, }) } }, @@ -107,7 +108,7 @@ func (h *clusterCache) addResourceEventHandler(gvr schema2.GroupVersionResource, h.workqueue.Add(event{ obj: rObj, oldObj: rOldObj, - gvr: gvr, + gvk: gvk, }) } } @@ -116,7 +117,7 @@ func (h *clusterCache) addResourceEventHandler(gvr schema2.GroupVersionResource, if rObj, ok := obj.(runtime.Object); ok { h.workqueue.Add(event{ obj: rObj, - gvr: gvr, + gvk: gvk, }) } }, @@ -128,7 +129,7 @@ func (h *clusterCache) OnSchemas(schemas *schema.Collection) error { defer h.Unlock() var ( - gvrs = map[schema2.GroupVersionResource]bool{} + gvks = map[schema2.GroupVersionKind]bool{} toWait []*watcher ) @@ -140,9 +141,9 @@ func (h *clusterCache) OnSchemas(schemas *schema.Collection) error { gvr := attributes.GVR(schema) gvk := attributes.GVK(schema) - gvrs[gvr] = true + gvks[gvk] = true - if h.watchers[gvr] != nil { + if h.watchers[gvk] != nil { continue } @@ -156,19 +157,19 @@ func (h *clusterCache) OnSchemas(schemas *schema.Collection) error { gvr: gvr, informer: summaryInformer.Informer(), } - h.watchers[gvr] = w + h.watchers[gvk] = w toWait = append(toWait, w) logrus.Infof("Watching metadata for %s", w.gvk) - h.addResourceEventHandler(w.gvr, w.informer) + h.addResourceEventHandler(w.gvk, w.informer) go w.informer.Run(w.ctx.Done()) } - for gvr, w := range h.watchers { - if !gvrs[gvr] { - logrus.Infof("Stopping metadata watch on %s", gvr) + for gvk, w := range h.watchers { + if !gvks[gvk] { + logrus.Infof("Stopping metadata watch on %s", gvk) w.cancel() - delete(h.watchers, gvr) + delete(h.watchers, gvk) } } @@ -178,7 +179,7 @@ func (h *clusterCache) OnSchemas(schemas *schema.Collection) error { logrus.Errorf("failed to sync cache for %v", w.gvk) cancel() w.cancel() - delete(h.watchers, w.gvr) + delete(h.watchers, w.gvk) } cancel() } @@ -186,11 +187,30 @@ func (h *clusterCache) OnSchemas(schemas *schema.Collection) error { return nil } -func (h *clusterCache) List(gvr schema2.GroupVersionResource) []interface{} { +func (h *clusterCache) Get(gvk schema2.GroupVersionKind, namespace, name string) (interface{}, bool, error) { h.RLock() defer h.RUnlock() - w, ok := h.watchers[gvr] + w, ok := h.watchers[gvk] + if !ok { + return nil, false, nil + } + + var key string + if namespace == "" { + key = name + } else { + key = namespace + "/" + name + } + + return w.informer.GetStore().GetByKey(key) +} + +func (h *clusterCache) List(gvk schema2.GroupVersionKind) []interface{} { + h.RLock() + defer h.RUnlock() + + w, ok := h.watchers[gvk] if !ok { return nil } @@ -208,7 +228,7 @@ func (h *clusterCache) start() { event := eventObj.(event) h.RLock() - w := h.watchers[event.gvr] + w := h.watchers[event.gvk] h.RUnlock() if w == nil { h.workqueue.Done(eventObj) @@ -217,17 +237,17 @@ func (h *clusterCache) start() { key := toKey(event.obj) if event.oldObj != nil { - _, err := callAll(h.changeHandlers.List(), event.gvr, key, event.obj, event.oldObj) + _, err := callAll(h.changeHandlers.List(), event.gvk, key, event.obj, event.oldObj) if err != nil { logrus.Errorf("failed to handle add event: %v", err) } } else if event.add { - _, err := callAll(h.addHandlers.List(), event.gvr, key, event.obj, nil) + _, err := callAll(h.addHandlers.List(), event.gvk, key, event.obj, nil) if err != nil { logrus.Errorf("failed to handle add event: %v", err) } } else { - _, err := callAll(h.removeHandlers.List(), event.gvr, key, event.obj, nil) + _, err := callAll(h.removeHandlers.List(), event.gvk, key, event.obj, nil) if err != nil { logrus.Errorf("failed to handle remove event: %v", err) } @@ -260,7 +280,7 @@ func (h *clusterCache) OnChange(ctx context.Context, handler ChangeHandler) { h.changeHandlers.Add(ctx, handler) } -func callAll(handlers []interface{}, gvr schema2.GroupVersionResource, key string, obj, oldObj runtime.Object) (runtime.Object, error) { +func callAll(handlers []interface{}, gvr schema2.GroupVersionKind, key string, obj, oldObj runtime.Object) (runtime.Object, error) { var errs []error for _, handler := range handlers { if f, ok := handler.(Handler); ok { diff --git a/pkg/podimpersonation/podimpersonation.go b/pkg/podimpersonation/podimpersonation.go index 40463f0..e726368 100644 --- a/pkg/podimpersonation/podimpersonation.go +++ b/pkg/podimpersonation/podimpersonation.go @@ -51,18 +51,18 @@ func New(key string, cg proxy.ClientGetter, roleTimeout time.Duration, imageName } } -func (s *PodImpersonation) PurgeOldRoles(gvr schema.GroupVersionResource, key string, obj runtime.Object) error { +func (s *PodImpersonation) PurgeOldRoles(gvk schema.GroupVersionKind, key string, obj runtime.Object) error { if obj == nil || - gvr.Version != "v1" || - gvr.Group != rbacv1.GroupName || - gvr.Resource != "clusterroles" { + gvk.Version != "v1" || + gvk.Group != rbacv1.GroupName || + gvk.Kind != "ClusterRole" { return nil } meta, err := meta.Accessor(obj) if err != nil { // ignore error - logrus.Warnf("failed to find metadata for %v, %s", gvr, key) + logrus.Warnf("failed to find metadata for %v, %s", gvk, key) return nil } diff --git a/pkg/resources/counts/counts.go b/pkg/resources/counts/counts.go index 31727a5..a44d44b 100644 --- a/pkg/resources/counts/counts.go +++ b/pkg/resources/counts/counts.go @@ -114,7 +114,7 @@ func (s *Store) Watch(apiOp *types.APIRequest, schema *types.APISchema, w types. var ( result = make(chan types.APIEvent, 100) counts map[string]ItemCount - gvrToSchema = map[schema2.GroupVersionResource]*types.APISchema{} + gvkToSchema = map[schema2.GroupVersionKind]*types.APISchema{} countLock sync.Mutex ) @@ -133,10 +133,10 @@ func (s *Store) Watch(apiOp *types.APIRequest, schema *types.APISchema, w types. continue } - gvrToSchema[attributes.GVR(schema)] = schema + gvkToSchema[attributes.GVK(schema)] = schema } - onChange := func(add bool, gvr schema2.GroupVersionResource, _ string, obj, oldObj runtime.Object) error { + onChange := func(add bool, gvk schema2.GroupVersionKind, _ string, obj, oldObj runtime.Object) error { countLock.Lock() defer countLock.Unlock() @@ -144,7 +144,7 @@ func (s *Store) Watch(apiOp *types.APIRequest, schema *types.APISchema, w types. return nil } - schema := gvrToSchema[gvr] + schema := gvkToSchema[gvk] if schema == nil { return nil } @@ -195,14 +195,14 @@ func (s *Store) Watch(apiOp *types.APIRequest, schema *types.APISchema, w types. return nil } - s.ccache.OnAdd(apiOp.Context(), func(gvr schema2.GroupVersionResource, key string, obj runtime.Object) error { - return onChange(true, gvr, key, obj, nil) + s.ccache.OnAdd(apiOp.Context(), func(gvk schema2.GroupVersionKind, key string, obj runtime.Object) error { + return onChange(true, gvk, key, obj, nil) }) - s.ccache.OnChange(apiOp.Context(), func(gvr schema2.GroupVersionResource, key string, obj, oldObj runtime.Object) error { - return onChange(true, gvr, key, obj, oldObj) + s.ccache.OnChange(apiOp.Context(), func(gvk schema2.GroupVersionKind, key string, obj, oldObj runtime.Object) error { + return onChange(true, gvk, key, obj, oldObj) }) - s.ccache.OnRemove(apiOp.Context(), func(gvr schema2.GroupVersionResource, key string, obj runtime.Object) error { - return onChange(false, gvr, key, obj, nil) + s.ccache.OnRemove(apiOp.Context(), func(gvk schema2.GroupVersionKind, key string, obj runtime.Object) error { + return onChange(false, gvk, key, obj, nil) }) return buffer(result), nil @@ -315,7 +315,7 @@ func (s *Store) getCount(apiOp *types.APIRequest) Count { counts := map[string]ItemCount{} for _, schema := range s.schemasToWatch(apiOp) { - gvr := attributes.GVR(schema) + gvk := attributes.GVK(schema) access, _ := attributes.Access(schema).(accesscontrol.AccessListByVerb) rev := 0 @@ -325,7 +325,7 @@ func (s *Store) getCount(apiOp *types.APIRequest) Count { all := access.Grants("list", "*", "*") - for _, obj := range s.ccache.List(gvr) { + for _, obj := range s.ccache.List(gvk) { name, ns, revision, summary, ok := getInfo(obj) if !ok { continue