1
0
mirror of https://github.com/rancher/steve.git synced 2025-05-09 00:17:14 +00:00

Switch clustercache to index by gvk not gvr

This commit is contained in:
Darren Shepherd 2020-10-23 13:57:57 -07:00
parent e29561cbf9
commit a44863b331
3 changed files with 64 additions and 44 deletions
pkg
clustercache
podimpersonation
resources/counts

View File

@ -21,11 +21,12 @@ import (
"k8s.io/client-go/util/workqueue"
)
type Handler func(gvr schema2.GroupVersionResource, key string, obj runtime.Object) error
type ChangeHandler func(gvr schema2.GroupVersionResource, key string, obj, oldObj runtime.Object) error
type Handler func(gvr schema2.GroupVersionKind, key string, obj runtime.Object) error
type ChangeHandler func(gvr schema2.GroupVersionKind, key string, obj, oldObj runtime.Object) error
type ClusterCache interface {
List(gvr schema2.GroupVersionResource) []interface{}
Get(gvk schema2.GroupVersionKind, namespace, name string) (interface{}, bool, error)
List(gvk schema2.GroupVersionKind) []interface{}
OnAdd(ctx context.Context, handler Handler)
OnRemove(ctx context.Context, handler Handler)
OnChange(ctx context.Context, handler ChangeHandler)
@ -34,7 +35,7 @@ type ClusterCache interface {
type event struct {
add bool
gvr schema2.GroupVersionResource
gvk schema2.GroupVersionKind
obj runtime.Object
oldObj runtime.Object
}
@ -52,7 +53,7 @@ type clusterCache struct {
ctx context.Context
summaryClient client.Interface
watchers map[schema2.GroupVersionResource]*watcher
watchers map[schema2.GroupVersionKind]*watcher
workqueue workqueue.DelayingInterface
addHandlers cancelCollection
@ -64,7 +65,7 @@ func NewClusterCache(ctx context.Context, dynamicClient dynamic.Interface) Clust
c := &clusterCache{
ctx: ctx,
summaryClient: client.NewForDynamicClient(dynamicClient),
watchers: map[schema2.GroupVersionResource]*watcher{},
watchers: map[schema2.GroupVersionKind]*watcher{},
workqueue: workqueue.NewNamedDelayingQueue("cluster-cache"),
}
go c.start()
@ -90,14 +91,14 @@ func validSchema(schema *types.APISchema) bool {
return true
}
func (h *clusterCache) addResourceEventHandler(gvr schema2.GroupVersionResource, informer cache.SharedIndexInformer) {
func (h *clusterCache) addResourceEventHandler(gvk schema2.GroupVersionKind, informer cache.SharedIndexInformer) {
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
if rObj, ok := obj.(runtime.Object); ok {
h.workqueue.Add(event{
add: true,
obj: rObj,
gvr: gvr,
gvk: gvk,
})
}
},
@ -107,7 +108,7 @@ func (h *clusterCache) addResourceEventHandler(gvr schema2.GroupVersionResource,
h.workqueue.Add(event{
obj: rObj,
oldObj: rOldObj,
gvr: gvr,
gvk: gvk,
})
}
}
@ -116,7 +117,7 @@ func (h *clusterCache) addResourceEventHandler(gvr schema2.GroupVersionResource,
if rObj, ok := obj.(runtime.Object); ok {
h.workqueue.Add(event{
obj: rObj,
gvr: gvr,
gvk: gvk,
})
}
},
@ -128,7 +129,7 @@ func (h *clusterCache) OnSchemas(schemas *schema.Collection) error {
defer h.Unlock()
var (
gvrs = map[schema2.GroupVersionResource]bool{}
gvks = map[schema2.GroupVersionKind]bool{}
toWait []*watcher
)
@ -140,9 +141,9 @@ func (h *clusterCache) OnSchemas(schemas *schema.Collection) error {
gvr := attributes.GVR(schema)
gvk := attributes.GVK(schema)
gvrs[gvr] = true
gvks[gvk] = true
if h.watchers[gvr] != nil {
if h.watchers[gvk] != nil {
continue
}
@ -156,19 +157,19 @@ func (h *clusterCache) OnSchemas(schemas *schema.Collection) error {
gvr: gvr,
informer: summaryInformer.Informer(),
}
h.watchers[gvr] = w
h.watchers[gvk] = w
toWait = append(toWait, w)
logrus.Infof("Watching metadata for %s", w.gvk)
h.addResourceEventHandler(w.gvr, w.informer)
h.addResourceEventHandler(w.gvk, w.informer)
go w.informer.Run(w.ctx.Done())
}
for gvr, w := range h.watchers {
if !gvrs[gvr] {
logrus.Infof("Stopping metadata watch on %s", gvr)
for gvk, w := range h.watchers {
if !gvks[gvk] {
logrus.Infof("Stopping metadata watch on %s", gvk)
w.cancel()
delete(h.watchers, gvr)
delete(h.watchers, gvk)
}
}
@ -178,7 +179,7 @@ func (h *clusterCache) OnSchemas(schemas *schema.Collection) error {
logrus.Errorf("failed to sync cache for %v", w.gvk)
cancel()
w.cancel()
delete(h.watchers, w.gvr)
delete(h.watchers, w.gvk)
}
cancel()
}
@ -186,11 +187,30 @@ func (h *clusterCache) OnSchemas(schemas *schema.Collection) error {
return nil
}
func (h *clusterCache) List(gvr schema2.GroupVersionResource) []interface{} {
func (h *clusterCache) Get(gvk schema2.GroupVersionKind, namespace, name string) (interface{}, bool, error) {
h.RLock()
defer h.RUnlock()
w, ok := h.watchers[gvr]
w, ok := h.watchers[gvk]
if !ok {
return nil, false, nil
}
var key string
if namespace == "" {
key = name
} else {
key = namespace + "/" + name
}
return w.informer.GetStore().GetByKey(key)
}
func (h *clusterCache) List(gvk schema2.GroupVersionKind) []interface{} {
h.RLock()
defer h.RUnlock()
w, ok := h.watchers[gvk]
if !ok {
return nil
}
@ -208,7 +228,7 @@ func (h *clusterCache) start() {
event := eventObj.(event)
h.RLock()
w := h.watchers[event.gvr]
w := h.watchers[event.gvk]
h.RUnlock()
if w == nil {
h.workqueue.Done(eventObj)
@ -217,17 +237,17 @@ func (h *clusterCache) start() {
key := toKey(event.obj)
if event.oldObj != nil {
_, err := callAll(h.changeHandlers.List(), event.gvr, key, event.obj, event.oldObj)
_, err := callAll(h.changeHandlers.List(), event.gvk, key, event.obj, event.oldObj)
if err != nil {
logrus.Errorf("failed to handle add event: %v", err)
}
} else if event.add {
_, err := callAll(h.addHandlers.List(), event.gvr, key, event.obj, nil)
_, err := callAll(h.addHandlers.List(), event.gvk, key, event.obj, nil)
if err != nil {
logrus.Errorf("failed to handle add event: %v", err)
}
} else {
_, err := callAll(h.removeHandlers.List(), event.gvr, key, event.obj, nil)
_, err := callAll(h.removeHandlers.List(), event.gvk, key, event.obj, nil)
if err != nil {
logrus.Errorf("failed to handle remove event: %v", err)
}
@ -260,7 +280,7 @@ func (h *clusterCache) OnChange(ctx context.Context, handler ChangeHandler) {
h.changeHandlers.Add(ctx, handler)
}
func callAll(handlers []interface{}, gvr schema2.GroupVersionResource, key string, obj, oldObj runtime.Object) (runtime.Object, error) {
func callAll(handlers []interface{}, gvr schema2.GroupVersionKind, key string, obj, oldObj runtime.Object) (runtime.Object, error) {
var errs []error
for _, handler := range handlers {
if f, ok := handler.(Handler); ok {

View File

@ -51,18 +51,18 @@ func New(key string, cg proxy.ClientGetter, roleTimeout time.Duration, imageName
}
}
func (s *PodImpersonation) PurgeOldRoles(gvr schema.GroupVersionResource, key string, obj runtime.Object) error {
func (s *PodImpersonation) PurgeOldRoles(gvk schema.GroupVersionKind, key string, obj runtime.Object) error {
if obj == nil ||
gvr.Version != "v1" ||
gvr.Group != rbacv1.GroupName ||
gvr.Resource != "clusterroles" {
gvk.Version != "v1" ||
gvk.Group != rbacv1.GroupName ||
gvk.Kind != "ClusterRole" {
return nil
}
meta, err := meta.Accessor(obj)
if err != nil {
// ignore error
logrus.Warnf("failed to find metadata for %v, %s", gvr, key)
logrus.Warnf("failed to find metadata for %v, %s", gvk, key)
return nil
}

View File

@ -114,7 +114,7 @@ func (s *Store) Watch(apiOp *types.APIRequest, schema *types.APISchema, w types.
var (
result = make(chan types.APIEvent, 100)
counts map[string]ItemCount
gvrToSchema = map[schema2.GroupVersionResource]*types.APISchema{}
gvkToSchema = map[schema2.GroupVersionKind]*types.APISchema{}
countLock sync.Mutex
)
@ -133,10 +133,10 @@ func (s *Store) Watch(apiOp *types.APIRequest, schema *types.APISchema, w types.
continue
}
gvrToSchema[attributes.GVR(schema)] = schema
gvkToSchema[attributes.GVK(schema)] = schema
}
onChange := func(add bool, gvr schema2.GroupVersionResource, _ string, obj, oldObj runtime.Object) error {
onChange := func(add bool, gvk schema2.GroupVersionKind, _ string, obj, oldObj runtime.Object) error {
countLock.Lock()
defer countLock.Unlock()
@ -144,7 +144,7 @@ func (s *Store) Watch(apiOp *types.APIRequest, schema *types.APISchema, w types.
return nil
}
schema := gvrToSchema[gvr]
schema := gvkToSchema[gvk]
if schema == nil {
return nil
}
@ -195,14 +195,14 @@ func (s *Store) Watch(apiOp *types.APIRequest, schema *types.APISchema, w types.
return nil
}
s.ccache.OnAdd(apiOp.Context(), func(gvr schema2.GroupVersionResource, key string, obj runtime.Object) error {
return onChange(true, gvr, key, obj, nil)
s.ccache.OnAdd(apiOp.Context(), func(gvk schema2.GroupVersionKind, key string, obj runtime.Object) error {
return onChange(true, gvk, key, obj, nil)
})
s.ccache.OnChange(apiOp.Context(), func(gvr schema2.GroupVersionResource, key string, obj, oldObj runtime.Object) error {
return onChange(true, gvr, key, obj, oldObj)
s.ccache.OnChange(apiOp.Context(), func(gvk schema2.GroupVersionKind, key string, obj, oldObj runtime.Object) error {
return onChange(true, gvk, key, obj, oldObj)
})
s.ccache.OnRemove(apiOp.Context(), func(gvr schema2.GroupVersionResource, key string, obj runtime.Object) error {
return onChange(false, gvr, key, obj, nil)
s.ccache.OnRemove(apiOp.Context(), func(gvk schema2.GroupVersionKind, key string, obj runtime.Object) error {
return onChange(false, gvk, key, obj, nil)
})
return buffer(result), nil
@ -315,7 +315,7 @@ func (s *Store) getCount(apiOp *types.APIRequest) Count {
counts := map[string]ItemCount{}
for _, schema := range s.schemasToWatch(apiOp) {
gvr := attributes.GVR(schema)
gvk := attributes.GVK(schema)
access, _ := attributes.Access(schema).(accesscontrol.AccessListByVerb)
rev := 0
@ -325,7 +325,7 @@ func (s *Store) getCount(apiOp *types.APIRequest) Count {
all := access.Grants("list", "*", "*")
for _, obj := range s.ccache.List(gvr) {
for _, obj := range s.ccache.List(gvk) {
name, ns, revision, summary, ok := getInfo(obj)
if !ok {
continue