1
0
mirror of https://github.com/rancher/steve.git synced 2025-04-28 03:10:32 +00:00
steve/pkg/clustercache/controller.go

294 lines
6.8 KiB
Go
Raw Normal View History

2019-09-09 21:28:55 +00:00
package clustercache
import (
"context"
"sync"
"time"
2019-09-11 21:05:00 +00:00
"github.com/rancher/steve/pkg/attributes"
2020-01-31 05:37:59 +00:00
"github.com/rancher/steve/pkg/schema"
"github.com/rancher/steve/pkg/schemaserver/types"
2019-09-09 21:28:55 +00:00
"github.com/rancher/wrangler/pkg/generic"
"github.com/rancher/wrangler/pkg/merr"
"github.com/rancher/wrangler/pkg/summary/client"
"github.com/rancher/wrangler/pkg/summary/informer"
2019-09-09 21:28:55 +00:00
"github.com/sirupsen/logrus"
2020-01-31 05:37:59 +00:00
"k8s.io/apimachinery/pkg/api/meta"
2019-09-09 21:28:55 +00:00
"k8s.io/apimachinery/pkg/runtime"
schema2 "k8s.io/apimachinery/pkg/runtime/schema"
2020-03-13 02:14:40 +00:00
"k8s.io/client-go/dynamic"
2019-09-09 21:28:55 +00:00
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
)
type Handler func(gvr schema2.GroupVersionResource, key string, obj runtime.Object) error
2020-03-13 02:14:40 +00:00
type ChangeHandler func(gvr schema2.GroupVersionResource, key string, obj, oldObj runtime.Object) error
2019-09-09 21:28:55 +00:00
type ClusterCache interface {
List(gvr schema2.GroupVersionResource) []interface{}
OnAdd(ctx context.Context, handler Handler)
OnRemove(ctx context.Context, handler Handler)
2020-03-13 02:14:40 +00:00
OnChange(ctx context.Context, handler ChangeHandler)
2019-09-09 21:28:55 +00:00
OnSchemas(schemas *schema.Collection) error
}
type event struct {
2020-03-13 02:14:40 +00:00
add bool
gvr schema2.GroupVersionResource
obj runtime.Object
oldObj runtime.Object
2019-09-09 21:28:55 +00:00
}
type watcher struct {
ctx context.Context
cancel func()
informer cache.SharedIndexInformer
gvk schema2.GroupVersionKind
gvr schema2.GroupVersionResource
start bool
}
type clusterCache struct {
sync.RWMutex
ctx context.Context
typed map[schema2.GroupVersionKind]cache.SharedIndexInformer
informerFactory informer.SummarySharedInformerFactory
2019-09-09 21:28:55 +00:00
controllerFactory generic.ControllerManager
watchers map[schema2.GroupVersionResource]*watcher
workqueue workqueue.DelayingInterface
addHandlers cancelCollection
removeHandlers cancelCollection
changeHandlers cancelCollection
}
func NewClusterCache(ctx context.Context, dynamicClient dynamic.Interface) ClusterCache {
2019-09-09 21:28:55 +00:00
c := &clusterCache{
ctx: ctx,
typed: map[schema2.GroupVersionKind]cache.SharedIndexInformer{},
informerFactory: informer.NewSummarySharedInformerFactory(client.NewForDynamicClient(dynamicClient), 2*time.Hour),
2019-09-09 21:28:55 +00:00
watchers: map[schema2.GroupVersionResource]*watcher{},
workqueue: workqueue.NewNamedDelayingQueue("cluster-cache"),
}
go c.start()
return c
}
2020-01-31 05:37:59 +00:00
func validSchema(schema *types.APISchema) bool {
2019-09-09 21:28:55 +00:00
canList := false
canWatch := false
for _, verb := range attributes.Verbs(schema) {
switch verb {
case "list":
canList = true
case "watch":
canWatch = true
}
}
if !canList || !canWatch {
return false
}
return true
}
func (h *clusterCache) addResourceEventHandler(gvr schema2.GroupVersionResource, informer cache.SharedIndexInformer) {
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
if rObj, ok := obj.(runtime.Object); ok {
h.workqueue.Add(event{
add: true,
obj: rObj,
gvr: gvr,
})
}
},
2020-03-13 02:14:40 +00:00
UpdateFunc: func(oldObj, newObj interface{}) {
if rObj, ok := newObj.(runtime.Object); ok {
if rOldObj, ok := oldObj.(runtime.Object); ok {
h.workqueue.Add(event{
obj: rObj,
oldObj: rOldObj,
gvr: gvr,
})
}
}
},
2019-09-09 21:28:55 +00:00
DeleteFunc: func(obj interface{}) {
if rObj, ok := obj.(runtime.Object); ok {
h.workqueue.Add(event{
obj: rObj,
gvr: gvr,
})
}
},
})
}
func (h *clusterCache) OnSchemas(schemas *schema.Collection) error {
h.Lock()
defer h.Unlock()
var (
toStart = map[schema2.GroupVersionResource]*watcher{}
gvrs = map[schema2.GroupVersionResource]bool{}
)
for _, id := range schemas.IDs() {
schema := schemas.Schema(id)
if !validSchema(schema) {
continue
}
gvr := attributes.GVR(schema)
gvk := attributes.GVK(schema)
gvrs[gvr] = true
w := h.watchers[gvr]
if w != nil {
continue
}
ctx, cancel := context.WithCancel(h.ctx)
w = &watcher{
ctx: ctx,
cancel: cancel,
gvk: gvk,
gvr: gvr,
informer: h.typed[gvk],
}
toStart[gvr] = w
if w.informer == nil {
w.informer = h.informerFactory.ForResource(gvr).Informer()
w.start = true
h.addResourceEventHandler(gvr, w.informer)
2019-09-09 21:28:55 +00:00
}
}
for gvr, w := range h.watchers {
if !gvrs[gvr] {
2020-02-08 20:04:20 +00:00
logrus.Infof("Stopping metadata watch on %s", gvr)
2019-09-09 21:28:55 +00:00
w.cancel()
delete(h.watchers, gvr)
}
}
var toWait []*watcher
2019-09-09 21:28:55 +00:00
for _, w := range toStart {
if !w.start {
continue
}
w.start = false
logrus.Infof("Watching metadata for %s", w.gvk)
2019-09-09 21:28:55 +00:00
go w.informer.Run(w.ctx.Done())
toWait = append(toWait, w)
2019-09-09 21:28:55 +00:00
}
for _, w := range toWait {
2019-09-09 21:28:55 +00:00
cache.WaitForCacheSync(w.ctx.Done(), w.informer.HasSynced)
}
var errs []error
for _, w := range toStart {
if err := h.controllerFactory.EnsureStart(w.ctx, w.gvk, 5); err != nil {
errs = append(errs, err)
}
h.watchers[w.gvr] = w
}
return merr.NewErrors(errs...)
}
func (h *clusterCache) List(gvr schema2.GroupVersionResource) []interface{} {
h.RLock()
defer h.RUnlock()
w, ok := h.watchers[gvr]
if !ok {
return nil
}
return w.informer.GetStore().List()
}
func (h *clusterCache) start() {
for {
eventObj, ok := h.workqueue.Get()
if ok {
break
}
event := eventObj.(event)
2020-02-03 18:30:34 +00:00
h.RLock()
2019-09-09 21:28:55 +00:00
w := h.watchers[event.gvr]
2020-02-03 18:30:34 +00:00
h.RUnlock()
2019-09-09 21:28:55 +00:00
if w == nil {
continue
}
key := toKey(event.obj)
2020-03-13 02:14:40 +00:00
if event.oldObj != nil {
2020-03-14 18:46:20 +00:00
_, err := callAll(h.changeHandlers.List(), event.gvr, key, event.obj, event.oldObj)
2020-03-13 02:14:40 +00:00
if err != nil {
logrus.Errorf("failed to handle add event: %v", err)
}
} else if event.add {
_, err := callAll(h.addHandlers.List(), event.gvr, key, event.obj, nil)
2019-09-09 21:28:55 +00:00
if err != nil {
logrus.Errorf("failed to handle add event: %v", err)
}
} else {
2020-03-13 02:14:40 +00:00
_, err := callAll(h.removeHandlers.List(), event.gvr, key, event.obj, nil)
2019-09-09 21:28:55 +00:00
if err != nil {
logrus.Errorf("failed to handle remove event: %v", err)
}
}
}
}
func toKey(obj runtime.Object) string {
meta, err := meta.Accessor(obj)
if err != nil {
return ""
}
ns := meta.GetNamespace()
if ns == "" {
return meta.GetName()
}
return ns + "/" + meta.GetName()
}
func (h *clusterCache) OnAdd(ctx context.Context, handler Handler) {
h.addHandlers.Add(ctx, handler)
}
func (h *clusterCache) OnRemove(ctx context.Context, handler Handler) {
h.removeHandlers.Add(ctx, handler)
}
2020-03-13 02:14:40 +00:00
func (h *clusterCache) OnChange(ctx context.Context, handler ChangeHandler) {
2019-09-09 21:28:55 +00:00
h.changeHandlers.Add(ctx, handler)
}
2020-03-13 02:14:40 +00:00
func callAll(handlers []interface{}, gvr schema2.GroupVersionResource, key string, obj, oldObj runtime.Object) (runtime.Object, error) {
2019-09-09 21:28:55 +00:00
var errs []error
for _, handler := range handlers {
2020-03-13 02:14:40 +00:00
if f, ok := handler.(Handler); ok {
if err := f(gvr, key, obj); err != nil {
errs = append(errs, err)
}
}
if f, ok := handler.(ChangeHandler); ok {
if err := f(gvr, key, obj, oldObj); err != nil {
errs = append(errs, err)
}
2019-09-09 21:28:55 +00:00
}
}
return obj, merr.NewErrors(errs...)
}