From f81721ef93a994d0d973df9649e703742b2aa7c1 Mon Sep 17 00:00:00 2001 From: Darren Shepherd Date: Mon, 9 Sep 2019 14:28:55 -0700 Subject: [PATCH] Improve counts and columns --- main.go | 15 ++ pkg/client/factory.go | 4 + pkg/clustercache/cancel_collection.go | 29 ++ pkg/clustercache/controller.go | 286 ++++++++++++++++++++ pkg/controllers/helmrelease/handler.go | 34 +++ pkg/controllers/schema/schemas.go | 27 +- pkg/proxy/proxy.go | 2 - pkg/resources/common/defaultcolumns.go | 29 +- pkg/resources/core/resources.go | 173 ++++++++++++ pkg/resources/counts/counts.go | 327 +++++++++++------------ pkg/resources/helmrelease/convert.go | 64 +++++ pkg/resources/helmrelease/helmrelease.go | 36 +++ pkg/resources/helmrelease/store.go | 62 +++++ pkg/resources/schema.go | 17 +- pkg/resources/schema/collection.go | 60 +++-- pkg/resources/schema/factory.go | 5 + pkg/server/server.go | 36 ++- pkg/table/mapper.go | 10 +- 18 files changed, 1001 insertions(+), 215 deletions(-) create mode 100644 pkg/clustercache/cancel_collection.go create mode 100644 pkg/clustercache/controller.go create mode 100644 pkg/controllers/helmrelease/handler.go create mode 100644 pkg/resources/core/resources.go create mode 100644 pkg/resources/helmrelease/convert.go create mode 100644 pkg/resources/helmrelease/helmrelease.go create mode 100644 pkg/resources/helmrelease/store.go diff --git a/main.go b/main.go index 987c2e3..f76873b 100644 --- a/main.go +++ b/main.go @@ -2,6 +2,7 @@ package main import ( "context" + "flag" "os" "github.com/rancher/naok/pkg/server" @@ -9,6 +10,7 @@ import ( "github.com/rancher/wrangler/pkg/signals" "github.com/sirupsen/logrus" "github.com/urfave/cli" + "k8s.io/klog" ) var ( @@ -49,8 +51,21 @@ func main() { } func run(c *cli.Context) error { + logging := flag.NewFlagSet("", flag.PanicOnError) + klog.InitFlags(logging) if c.Bool("debug") { logrus.SetLevel(logrus.DebugLevel) + if err := logging.Parse([]string{ + "-v=7", + }); err != nil { + return err + } + } else { + if err := logging.Parse([]string{ + "-v=0", + }); err != nil { + return err + } } ctx := signals.SetupSignalHandler(context.Background()) return server.Run(ctx, config) diff --git a/pkg/client/factory.go b/pkg/client/factory.go index 3186b1d..6bef8f7 100644 --- a/pkg/client/factory.go +++ b/pkg/client/factory.go @@ -24,6 +24,10 @@ func NewFactory(cfg *rest.Config) (*Factory, error) { }, nil } +func (p *Factory) DynamicClient() dynamic.Interface { + return p.client +} + func (p *Factory) Client(ctx *types.APIRequest, s *types.Schema) (dynamic.ResourceInterface, error) { gvr := attributes.GVR(s) if len(ctx.Namespaces) > 0 { diff --git a/pkg/clustercache/cancel_collection.go b/pkg/clustercache/cancel_collection.go new file mode 100644 index 0000000..d0cf7e5 --- /dev/null +++ b/pkg/clustercache/cancel_collection.go @@ -0,0 +1,29 @@ +package clustercache + +import ( + "context" + "sync" + "sync/atomic" +) + +type cancelCollection struct { + id int64 + items sync.Map +} + +func (c *cancelCollection) Add(ctx context.Context, obj interface{}) { + key := atomic.AddInt64(&c.id, 1) + c.items.Store(key, obj) + go func() { + <-ctx.Done() + c.items.Delete(key) + }() +} + +func (c *cancelCollection) List() (result []interface{}) { + c.items.Range(func(key, value interface{}) bool { + result = append(result, value) + return true + }) + return +} diff --git a/pkg/clustercache/controller.go b/pkg/clustercache/controller.go new file mode 100644 index 0000000..f1133ad --- /dev/null +++ b/pkg/clustercache/controller.go @@ -0,0 +1,286 @@ +package clustercache + +import ( + "context" + "fmt" + "sync" + "time" + + meta "k8s.io/apimachinery/pkg/api/meta" + + "github.com/rancher/naok/pkg/attributes" + "github.com/rancher/naok/pkg/resources/schema" + "github.com/rancher/norman/pkg/types" + "github.com/rancher/wrangler/pkg/generic" + "github.com/rancher/wrangler/pkg/merr" + "github.com/sirupsen/logrus" + "k8s.io/apimachinery/pkg/runtime" + schema2 "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/dynamic/dynamicinformer" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" +) + +var ( + logOnce = sync.Once{} +) + +type Handler func(gvr schema2.GroupVersionResource, key string, obj runtime.Object) error + +type ClusterCache interface { + AddController(gvk schema2.GroupVersionKind, informer cache.SharedIndexInformer) + List(gvr schema2.GroupVersionResource) []interface{} + OnAdd(ctx context.Context, handler Handler) + OnRemove(ctx context.Context, handler Handler) + OnChange(ctx context.Context, handler Handler) + OnSchemas(schemas *schema.Collection) error +} + +type event struct { + add bool + gvr schema2.GroupVersionResource + obj runtime.Object +} + +type watcher struct { + ctx context.Context + cancel func() + informer cache.SharedIndexInformer + gvk schema2.GroupVersionKind + gvr schema2.GroupVersionResource + start bool +} + +type clusterCache struct { + sync.RWMutex + + ctx context.Context + typed map[schema2.GroupVersionKind]cache.SharedIndexInformer + informerFactory dynamicinformer.DynamicSharedInformerFactory + controllerFactory generic.ControllerManager + watchers map[schema2.GroupVersionResource]*watcher + workqueue workqueue.DelayingInterface + + addHandlers cancelCollection + removeHandlers cancelCollection + changeHandlers cancelCollection +} + +func NewClusterCache(ctx context.Context, client dynamic.Interface) ClusterCache { + c := &clusterCache{ + ctx: ctx, + typed: map[schema2.GroupVersionKind]cache.SharedIndexInformer{}, + informerFactory: dynamicinformer.NewDynamicSharedInformerFactory(client, 2*time.Hour), + watchers: map[schema2.GroupVersionResource]*watcher{}, + workqueue: workqueue.NewNamedDelayingQueue("cluster-cache"), + } + go c.start() + return c +} + +func (h *clusterCache) AddController(gvk schema2.GroupVersionKind, informer cache.SharedIndexInformer) { + h.typed[gvk] = informer +} + +func validSchema(schema *types.Schema) bool { + canList := false + canWatch := false + for _, verb := range attributes.Verbs(schema) { + switch verb { + case "list": + canList = true + case "watch": + canWatch = true + } + } + + if !canList || !canWatch { + return false + } + + if attributes.PreferredVersion(schema) != "" { + return false + } + + if attributes.PreferredGroup(schema) != "" { + return false + } + + return true +} + +func (h *clusterCache) addResourceEventHandler(gvr schema2.GroupVersionResource, informer cache.SharedIndexInformer) { + informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + if rObj, ok := obj.(runtime.Object); ok { + h.workqueue.Add(event{ + add: true, + obj: rObj, + gvr: gvr, + }) + } + }, + DeleteFunc: func(obj interface{}) { + if rObj, ok := obj.(runtime.Object); ok { + h.workqueue.Add(event{ + obj: rObj, + gvr: gvr, + }) + } + }, + }) +} + +func (h *clusterCache) OnSchemas(schemas *schema.Collection) error { + h.Lock() + defer h.Unlock() + + var ( + toStart = map[schema2.GroupVersionResource]*watcher{} + gvrs = map[schema2.GroupVersionResource]bool{} + ) + + for _, id := range schemas.IDs() { + schema := schemas.Schema(id) + if !validSchema(schema) { + continue + } + + gvr := attributes.GVR(schema) + gvk := attributes.GVK(schema) + gvrs[gvr] = true + + w := h.watchers[gvr] + if w != nil { + continue + } + + ctx, cancel := context.WithCancel(h.ctx) + w = &watcher{ + ctx: ctx, + cancel: cancel, + gvk: gvk, + gvr: gvr, + informer: h.typed[gvk], + } + toStart[gvr] = w + + if w.informer == nil { + w.informer = h.informerFactory.ForResource(gvr).Informer() + w.start = true + } + + logrus.Infof("Watching counts for %s", gvk.String()) + h.addResourceEventHandler(gvr, w.informer) + name := fmt.Sprintf("meta %s", gvk) + h.controllerFactory.AddHandler(ctx, gvk, w.informer, name, func(key string, obj runtime.Object) (object runtime.Object, e error) { + return callAll(h.changeHandlers.List(), gvr, key, obj) + }) + } + + for gvr, w := range h.watchers { + if !gvrs[gvr] { + logrus.Infof("Stopping count watch on %s", gvr) + w.cancel() + delete(h.watchers, gvr) + } + } + + for _, w := range toStart { + if !w.start { + continue + } + go w.informer.Run(w.ctx.Done()) + } + + for _, w := range toStart { + cache.WaitForCacheSync(w.ctx.Done(), w.informer.HasSynced) + } + + var errs []error + for _, w := range toStart { + if err := h.controllerFactory.EnsureStart(w.ctx, w.gvk, 5); err != nil { + errs = append(errs, err) + } + h.watchers[w.gvr] = w + } + + return merr.NewErrors(errs...) +} + +func (h *clusterCache) List(gvr schema2.GroupVersionResource) []interface{} { + h.RLock() + defer h.RUnlock() + + w, ok := h.watchers[gvr] + if !ok { + return nil + } + + return w.informer.GetStore().List() +} + +func (h *clusterCache) start() { + for { + eventObj, ok := h.workqueue.Get() + if ok { + break + } + + event := eventObj.(event) + w := h.watchers[event.gvr] + if w == nil { + continue + } + + key := toKey(event.obj) + if event.add { + _, err := callAll(h.addHandlers.List(), event.gvr, key, event.obj) + if err != nil { + logrus.Errorf("failed to handle add event: %v", err) + } + } else { + _, err := callAll(h.removeHandlers.List(), event.gvr, key, event.obj) + if err != nil { + logrus.Errorf("failed to handle remove event: %v", err) + } + } + } +} + +func toKey(obj runtime.Object) string { + meta, err := meta.Accessor(obj) + if err != nil { + return "" + } + ns := meta.GetNamespace() + if ns == "" { + return meta.GetName() + } + return ns + "/" + meta.GetName() +} + +func (h *clusterCache) OnAdd(ctx context.Context, handler Handler) { + h.addHandlers.Add(ctx, handler) +} + +func (h *clusterCache) OnRemove(ctx context.Context, handler Handler) { + h.removeHandlers.Add(ctx, handler) +} + +func (h *clusterCache) OnChange(ctx context.Context, handler Handler) { + h.changeHandlers.Add(ctx, handler) +} + +func callAll(handlers []interface{}, gvr schema2.GroupVersionResource, key string, obj runtime.Object) (runtime.Object, error) { + var errs []error + for _, handler := range handlers { + f := handler.(Handler) + if err := f(gvr, key, obj); err != nil { + errs = append(errs, err) + } + } + + return obj, merr.NewErrors(errs...) +} diff --git a/pkg/controllers/helmrelease/handler.go b/pkg/controllers/helmrelease/handler.go new file mode 100644 index 0000000..20814af --- /dev/null +++ b/pkg/controllers/helmrelease/handler.go @@ -0,0 +1,34 @@ +package helmrelease + +import ( + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +type HelmRelease struct { +} + +type handler struct { + releases map[string]HelmRelease +} + +func (h *handler) OnConfigMapChange(key string, obj *v1.ConfigMap) (*v1.ConfigMap, error) { + if !h.isRelease(obj) { + return obj, nil + } + +} + +func (h *handler) OnSecretChange(key string, obj *v1.Secret) (*v1.Secret, error) { + if !h.isRelease(obj) { + return obj, nil + } + +} + +func (n *handler) isRelease(obj metav1.Object) bool { + if obj == nil { + return false + } + return obj.GetLabels()["OWNER"] == "TILLER" +} diff --git a/pkg/controllers/schema/schemas.go b/pkg/controllers/schema/schemas.go index b028b79..d75824a 100644 --- a/pkg/controllers/schema/schemas.go +++ b/pkg/controllers/schema/schemas.go @@ -16,6 +16,10 @@ import ( apiv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1" ) +type SchemasHandler interface { + OnSchemas(schemas *schema2.Collection) error +} + type handler struct { sync.Mutex @@ -23,33 +27,43 @@ type handler struct { schemas *schema2.Collection client discovery.DiscoveryInterface crd apiextcontrollerv1beta1.CustomResourceDefinitionClient + handler SchemasHandler } func Register(ctx context.Context, discovery discovery.DiscoveryInterface, crd apiextcontrollerv1beta1.CustomResourceDefinitionController, apiService v1.APIServiceController, - schemas *schema2.Collection) { + schemasHandler SchemasHandler, + schemas *schema2.Collection) (init func() error) { h := &handler{ client: discovery, schemas: schemas, + handler: schemasHandler, crd: crd, } apiService.OnChange(ctx, "schema", h.OnChangeAPIService) crd.OnChange(ctx, "schema", h.OnChangeCRD) + + return func() error { + h.queueRefresh() + return h.refreshAll() + } } func (h *handler) OnChangeCRD(key string, crd *v1beta1.CustomResourceDefinition) (*v1beta1.CustomResourceDefinition, error) { - return crd, h.queueRefresh() + h.queueRefresh() + return crd, nil } func (h *handler) OnChangeAPIService(key string, api *apiv1.APIService) (*apiv1.APIService, error) { - return api, h.queueRefresh() + h.queueRefresh() + return api, nil } -func (h *handler) queueRefresh() error { +func (h *handler) queueRefresh() { atomic.StoreInt32(&h.toSync, 1) go func() { @@ -59,8 +73,6 @@ func (h *handler) queueRefresh() error { atomic.StoreInt32(&h.toSync, 1) } }() - - return nil } func (h *handler) refreshAll() error { @@ -78,6 +90,9 @@ func (h *handler) refreshAll() error { } h.schemas.Reset(schemas) + if h.handler != nil { + return h.handler.OnSchemas(h.schemas) + } return nil } diff --git a/pkg/proxy/proxy.go b/pkg/proxy/proxy.go index e97044c..c38b8ff 100644 --- a/pkg/proxy/proxy.go +++ b/pkg/proxy/proxy.go @@ -1,7 +1,6 @@ package proxy import ( - "fmt" "net" "net/http" "net/url" @@ -85,7 +84,6 @@ func prependPath(prefix string, h http.Handler) http.Handler { // regexps will work.) func stripLeaveSlash(prefix string, h http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { - fmt.Println(req.Method, req.URL.Path) p := strings.TrimPrefix(req.URL.Path, prefix) if len(p) >= len(req.URL.Path) { http.NotFound(w, req) diff --git a/pkg/resources/common/defaultcolumns.go b/pkg/resources/common/defaultcolumns.go index 40393e7..c4fc60a 100644 --- a/pkg/resources/common/defaultcolumns.go +++ b/pkg/resources/common/defaultcolumns.go @@ -6,6 +6,21 @@ import ( "github.com/rancher/norman/pkg/types" ) +var ( + NameColumn = table.Column{ + Name: "Name", + Field: "metadata.name", + Type: "string", + Format: "name", + } + CreatedColumn = table.Column{ + Name: "Created", + Field: "metadata.creationTimestamp", + Type: "string", + Format: "date", + } +) + type DefaultColumns struct { types.EmptyMapper } @@ -13,18 +28,8 @@ type DefaultColumns struct { func (d *DefaultColumns) ModifySchema(schema *types.Schema, schemas *types.Schemas) error { if attributes.Columns(schema) == nil { attributes.SetColumns(schema, []table.Column{ - { - Name: "Name", - Field: "metadata.name", - Type: "string", - Format: "name", - }, - { - Name: "Created", - Field: "metadata.creationTimestamp", - Type: "string", - Format: "date", - }, + NameColumn, + CreatedColumn, }) } diff --git a/pkg/resources/core/resources.go b/pkg/resources/core/resources.go new file mode 100644 index 0000000..4d5ee09 --- /dev/null +++ b/pkg/resources/core/resources.go @@ -0,0 +1,173 @@ +package core + +import ( + "fmt" + "strings" + + "github.com/rancher/naok/pkg/resources/common" + "github.com/rancher/naok/pkg/resources/schema" + "github.com/rancher/naok/pkg/table" + "github.com/rancher/norman/pkg/data" + "github.com/rancher/norman/pkg/types/convert" + schema2 "k8s.io/apimachinery/pkg/runtime/schema" +) + +func Register(collection *schema.Collection) { + collection.AddTemplate(&schema.Template{ + Kind: "ConfigMap", + ComputedColumns: func(obj data.Object) { + var fields []string + for field := range obj.Map("data") { + fields = append(fields, field) + } + + obj.SetNested(len(obj.Map("data")), "metadata", "computed", "data") + obj.SetNested(fields, "metadata", "computed", "fields") + }, + Columns: []table.Column{ + common.NameColumn, + { + Name: "Data", + Field: "metadata.computed.data", + Type: "int", + }, + { + Name: "Fields", + Field: "metadata.computed.fields", + Type: "array[string]", + }, + common.CreatedColumn, + }, + }) + + collection.AddTemplate(&schema.Template{ + Group: "apps", + Kind: "ControllerRevision", + ComputedColumns: func(obj data.Object) { + for _, owner := range obj.Map("metadata").Slice("ownerReferences") { + if owner.Bool("controller") { + obj.SetNested(getReference(collection, obj, owner), "metadata", "computed", "controller") + } + } + }, + Columns: []table.Column{ + common.NameColumn, + { + Name: "Controller", + Field: "metadata.computed.controller", + Type: "reference", + }, + { + Name: "Revision", + Field: "revision", + Type: "int", + }, + common.CreatedColumn, + }, + }) + + collection.AddTemplate(&schema.Template{ + Group: "apps", + Kind: "DaemonSet", + Columns: []table.Column{ + common.NameColumn, + { + Name: "Desired", + Field: "status.desiredNumberScheduled", + Type: "int", + }, + { + Name: "Current", + Field: "status.currentNumberScheduled", + Type: "int", + }, + { + Name: "Ready", + Field: "status.numberReady", + Type: "int", + }, + { + Name: "Up-to-date", + Field: "status.updatedNumberScheduled", + Type: "int", + }, + { + Name: "Available", + Field: "status.numberAvailable", + Type: "int", + }, + //{ + // Name: "Node Selector", + // Field: "metadata.computed.nodeSelector", + // Type: "selector", + //}, + common.CreatedColumn, + }, + //ComputedColumns: func(obj data.Object) { + // obj.SetNested(podSelector(obj.String("metadata", "namespace"), obj.Map("spec", "selector")), "metadata", "computed", "nodeSelector") + //}, + }) +} + +func getReference(collection *schema.Collection, obj data.Object, owner data.Object) map[string]interface{} { + apiVersion := owner.String("apiVersion") + kind := owner.String("kind") + name := owner.String("name") + namespace := obj.String("metadata", "namespace") + gvk := schema2.FromAPIVersionAndKind(apiVersion, kind) + typeName := collection.ByGVK(gvk) + id := fmt.Sprintf("%s/%s", namespace, name) + if namespace == "" { + id = name + } + + return map[string]interface{}{ + "id": id, + "type": typeName, + } +} + +type selector struct { + Type string `json:"type,omitempty"` + Namespace string `json:"namespace,omitempty"` + Terms []string `json:"terms,omitempty"` +} + +func podSelector(namespace string, obj data.Object) (result selector) { + result.Type = "core.v1.pod" + result.Namespace = namespace + + for k, v := range obj.Map("matchLabels") { + vStr := convert.ToString(v) + if vStr == "" { + result.Terms = append(result.Terms, k) + } else { + result.Terms = append(result.Terms, fmt.Sprintf("%s=%s", k, v)) + } + } + + for _, term := range obj.Slice("matchExpressions") { + key := term.String("key") + values := term.StringSlice("values") + switch term.String("operator") { + case "In": + if len(values) == 1 { + result.Terms = append(result.Terms, fmt.Sprintf("%s=%s", key, values[0])) + } else { + result.Terms = append(result.Terms, fmt.Sprintf("%s in (%s)", key, strings.Join(values, ","))) + } + case "Not In": + if len(values) == 1 { + result.Terms = append(result.Terms, fmt.Sprintf("%s!=%s", key, values[0])) + } else { + result.Terms = append(result.Terms, fmt.Sprintf("%s notin (%s)", key, strings.Join(values, ","))) + } + case "Exists": + result.Terms = append(result.Terms, key) + case "NotExists": + result.Terms = append(result.Terms, "!"+key) + } + } + + return +} diff --git a/pkg/resources/counts/counts.go b/pkg/resources/counts/counts.go index 0e8a735..eb5ebb7 100644 --- a/pkg/resources/counts/counts.go +++ b/pkg/resources/counts/counts.go @@ -1,18 +1,19 @@ package counts import ( - "context" "net/http" + "strconv" "sync" - "time" - "github.com/rancher/naok/pkg/attributes" + schema2 "k8s.io/apimachinery/pkg/runtime/schema" "github.com/rancher/naok/pkg/accesscontrol" + "github.com/rancher/naok/pkg/attributes" + "github.com/rancher/naok/pkg/clustercache" "github.com/rancher/norman/pkg/store/empty" "github.com/rancher/norman/pkg/types" - "github.com/sirupsen/logrus" - "golang.org/x/sync/errgroup" + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/runtime" ) var ( @@ -21,14 +22,9 @@ var ( "schema": true, "apiRoot": true, } - slow = map[string]bool{ - "io.k8s.api.management.cattle.io.v3.CatalogTemplateVersion": true, - "io.k8s.api.management.cattle.io.v3.CatalogTemplate": true, - } - listTimeout = 1750 * time.Millisecond ) -func Register(schemas *types.Schemas) { +func Register(schemas *types.Schemas, ccache clustercache.ClusterCache) { schemas.MustImportAndCustomize(Count{}, func(schema *types.Schema) { schema.CollectionMethods = []string{http.MethodGet} schema.ResourceMethods = []string{http.MethodGet} @@ -40,137 +36,126 @@ func Register(schemas *types.Schemas) { }, }, } - schema.Store = &Store{} + schema.Store = &Store{ + ccache: ccache, + } }) } type Count struct { ID string `json:"id,omitempty"` - Counts map[string]ItemCount `json:"counts,omitempty"` + Counts map[string]ItemCount `json:"counts"` } type ItemCount struct { Count int `json:"count,omitempty"` Namespaces map[string]int `json:"namespaces,omitempty"` - Revision string `json:"revision,omitempty"` + Revision int `json:"revision,omitempty"` } type Store struct { empty.Store + ccache clustercache.ClusterCache } func (s *Store) ByID(apiOp *types.APIRequest, schema *types.Schema, id string) (types.APIObject, error) { - c, err := s.getCount(apiOp, listTimeout, true) - return types.ToAPI(c), err + c := s.getCount(apiOp) + return types.ToAPI(c), nil } func (s *Store) List(apiOp *types.APIRequest, schema *types.Schema, opt *types.QueryOptions) (types.APIObject, error) { - c, err := s.getCount(apiOp, listTimeout, true) - return types.ToAPI([]interface{}{c}), err + c := s.getCount(apiOp) + return types.ToAPI([]interface{}{c}), nil } func (s *Store) Watch(apiOp *types.APIRequest, schema *types.Schema, w types.WatchRequest) (chan types.APIEvent, error) { - c, err := s.getCount(apiOp, listTimeout*10, false) - if err != nil { - return nil, err - } + var ( + result = make(chan types.APIEvent, 100) + counts map[string]ItemCount + gvrToSchema = map[schema2.GroupVersionResource]*types.Schema{} + countLock sync.Mutex + ) - wg := sync.WaitGroup{} - ctx, cancel := context.WithCancel(apiOp.Context()) + counts = s.getCount(apiOp).Counts + for id := range counts { + schema := apiOp.Schemas.Schema(id) + if schema == nil { + continue + } - child := make(chan Count) - for name, countItem := range c.Counts { - wg.Add(1) - name := name - countItem := countItem - go func() { - s.watchItem(apiOp.WithContext(ctx), name, countItem, cancel, child) - wg.Done() - }() + gvrToSchema[attributes.GVR(schema)] = schema } go func() { - wg.Wait() - close(child) + <-apiOp.Context().Done() + close(result) }() - result := make(chan types.APIEvent) - go func() { - defer close(result) + onChange := func(add bool, gvr schema2.GroupVersionResource, _ string, obj runtime.Object) error { + countLock.Lock() + defer countLock.Unlock() + + schema := gvrToSchema[gvr] + if schema == nil { + return nil + } + + apiObj := apiOp.Filter(nil, schema, types.ToAPI(obj)) + if apiObj.IsNil() { + return nil + } + + _, namespace, revision, ok := getInfo(obj) + if !ok { + return nil + } + + itemCount := counts[schema.ID] + if revision <= itemCount.Revision { + return nil + } + + if add { + itemCount.Count++ + if namespace != "" { + itemCount.Namespaces[namespace]++ + } + } else { + itemCount.Count-- + if namespace != "" { + itemCount.Namespaces[namespace]-- + } + } + + counts[schema.ID] = itemCount + countsCopy := map[string]ItemCount{} + for k, v := range counts { + countsCopy[k] = v + } result <- types.APIEvent{ - Name: "resource.create", - ResourceType: "count", - Object: types.ToAPI(c), + Name: "resource.change", + ResourceType: "counts", + Object: types.ToAPI(Count{ + ID: "count", + Counts: countsCopy, + }), } - for change := range child { - for k, v := range change.Counts { - c.Counts[k] = v - } + return nil + } - result <- types.APIEvent{ - Name: "resource.change", - ResourceType: "count", - Object: types.ToAPI(c), - } - } - }() + s.ccache.OnAdd(apiOp.Context(), func(gvr schema2.GroupVersionResource, key string, obj runtime.Object) error { + return onChange(true, gvr, key, obj) + }) + s.ccache.OnRemove(apiOp.Context(), func(gvr schema2.GroupVersionResource, key string, obj runtime.Object) error { + return onChange(false, gvr, key, nil) + }) return result, nil } -func (s *Store) watchItem(apiOp *types.APIRequest, schemaID string, start ItemCount, cancel func(), counts chan Count) { - schema := apiOp.Schemas.Schema(schemaID) - if schema == nil || schema.Store == nil || apiOp.AccessControl.CanWatch(apiOp, schema) != nil { - return - } - defer cancel() - - logrus.Debugf("watching %s for count", schemaID) - defer logrus.Debugf("close watching %s for count", schemaID) - w, err := schema.Store.Watch(apiOp, schema, types.WatchRequest{Revision: start.Revision}) - if err != nil { - logrus.Errorf("failed to watch %s for counts: %v", schema.ID, err) - return - } - - for event := range w { - if event.Revision == start.Revision { - continue - } - - ns := types.Namespace(event.Object.Map()) - write := false - if event.Name == "resource.create" { - start.Count++ - write = true - if ns != "" { - start.Namespaces[ns]++ - } - } else if event.Name == "resource.remove" { - start.Count-- - write = true - if ns != "" { - start.Namespaces[ns]-- - } - } - if write { - counts <- Count{Counts: map[string]ItemCount{ - schemaID: start, - }} - } - } -} - -func (s *Store) getCount(apiOp *types.APIRequest, timeout time.Duration, ignoreSlow bool) (Count, error) { - var countLock sync.Mutex - counts := map[string]ItemCount{} - - errCtx, cancel := context.WithTimeout(apiOp.Context(), timeout) - eg, errCtx := errgroup.WithContext(errCtx) - defer cancel() - +func (s *Store) schemasToWatch(apiOp *types.APIRequest) (result []*types.Schema) { for _, schema := range apiOp.Schemas.Schemas() { if ignore[schema.ID] { continue @@ -184,10 +169,6 @@ func (s *Store) getCount(apiOp *types.APIRequest, timeout time.Duration, ignoreS continue } - if ignoreSlow && slow[schema.ID] { - continue - } - if schema.Store == nil { continue } @@ -196,73 +177,69 @@ func (s *Store) getCount(apiOp *types.APIRequest, timeout time.Duration, ignoreS continue } - current := schema - eg.Go(func() error { - list, err := current.Store.List(apiOp, current, nil) - if err != nil { - return err - } - if list.IsNil() { - return nil - } - - itemCount := ItemCount{ - Namespaces: map[string]int{}, - Revision: list.ListRevision, - } - - for _, item := range list.List() { - itemCount.Count++ - ns := types.Namespace(item) - if ns != "" { - itemCount.Namespaces[ns]++ - } - } - - countLock.Lock() - counts[current.ID] = itemCount - countLock.Unlock() - - return nil - }) - } - - var ( - err error - ) - - select { - case err = <-future(eg.Wait): - case <-errCtx.Done(): - err = errCtx.Err() - } - - if err != nil && err != context.Canceled && err != context.DeadlineExceeded { - return Count{}, err - } - - // in the case of cancellation go routines could still be running so we copy the map - // to avoid returning a map that might get modified - countLock.Lock() - result := Count{ - ID: "count", - Counts: map[string]ItemCount{}, - } - for k, v := range counts { - result.Counts[k] = v - } - countLock.Unlock() - - return result, nil -} - -func future(f func() error) chan error { - result := make(chan error, 1) - go func() { - defer close(result) - if err := f(); err != nil { - result <- err + if apiOp.AccessControl.CanWatch(apiOp, schema) != nil { + continue } - }() - return result + + result = append(result, schema) + } + + return +} + +func getInfo(obj interface{}) (name string, namespace string, revision int, ok bool) { + r, ok := obj.(runtime.Object) + if !ok { + return "", "", 0, false + } + + meta, err := meta.Accessor(r) + if err != nil { + return "", "", 0, false + } + + revision, err = strconv.Atoi(meta.GetResourceVersion()) + if err != nil { + return "", "", 0, false + } + + return meta.GetName(), meta.GetNamespace(), revision, true +} + +func (s *Store) getCount(apiOp *types.APIRequest) Count { + counts := map[string]ItemCount{} + + for _, schema := range s.schemasToWatch(apiOp) { + gvr := attributes.GVR(schema) + + rev := 0 + itemCount := ItemCount{ + Count: 1, + Namespaces: map[string]int{}, + } + + for _, obj := range s.ccache.List(gvr) { + _, ns, revision, ok := getInfo(obj) + if !ok { + continue + } + + if revision > rev { + rev = revision + } + + itemCount.Count++ + if ns != "" { + itemCount.Namespaces[ns]++ + } + } + + itemCount.Revision = rev + counts[schema.ID] = itemCount + } + + return Count{ + ID: "count", + Counts: counts, + } } diff --git a/pkg/resources/helmrelease/convert.go b/pkg/resources/helmrelease/convert.go new file mode 100644 index 0000000..5392e55 --- /dev/null +++ b/pkg/resources/helmrelease/convert.go @@ -0,0 +1,64 @@ +package helmrelease + +import ( + "encoding/base64" + "fmt" + "strings" + "time" + + "github.com/gogo/protobuf/proto" + "github.com/golang/protobuf/ptypes/timestamp" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/helm/pkg/proto/hapi/release" +) + +func ToRelease(data, name string) (*HelmRelease, error) { + bytes, err := base64.StdEncoding.DecodeString(data) + if err != nil { + return nil, err + } + + var hr release.Release + if err := proto.Unmarshal(bytes, &hr); err != nil { + return nil, err + } + + if hr.Chart == nil || hr.Chart.Metadata == nil { + return nil, fmt.Errorf("invalid chart, missing chart or metadata") + } + + hrVersion := HelmRelease{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: hr.Namespace, + }, + ID: fmt.Sprintf("%s:%s", hr.Namespace, name), + Name: hr.Name, + FirstDeployed: toTime(hr.Info.FirstDeployed), + LastDeployed: toTime(hr.Info.LastDeployed), + Deleted: toTime(hr.Info.Deleted), + Metadata: *hr.Chart.Metadata, + Status: release.Status_Code_name[int32(hr.Info.Status.Code)], + Manifest: hr.Manifest, + Version: hr.Version, + } + + if hr.Info.Status != nil { + hrVersion.Status = release.Status_Code_name[int32(hr.Info.Status.Code)] + for _, template := range hr.Chart.Templates { + if strings.EqualFold("readme.md", template.Name) { + hrVersion.ReadMe = string(template.Data) + } + } + } + + return &hrVersion, nil +} + +func toTime(t *timestamp.Timestamp) *metav1.Time { + if t == nil { + return nil + } + time := metav1.NewTime(time.Unix(t.Seconds, int64(t.Nanos))) + return &time +} diff --git a/pkg/resources/helmrelease/helmrelease.go b/pkg/resources/helmrelease/helmrelease.go new file mode 100644 index 0000000..b2bc232 --- /dev/null +++ b/pkg/resources/helmrelease/helmrelease.go @@ -0,0 +1,36 @@ +package helmrelease + +import ( + "net/http" + + "github.com/rancher/norman/pkg/types" + v1 "github.com/rancher/wrangler-api/pkg/generated/controllers/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/helm/pkg/proto/hapi/chart" +) + +type HelmRelease struct { + metav1.ObjectMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"` + + chart.Metadata `json:",inline"` + ID string `json:"id,omitempty"` + FirstDeployed *metav1.Time `json:"firstDeployed,omitempty"` + LastDeployed *metav1.Time `json:"lastDeployed,omitempty"` + Deleted *metav1.Time `json:"deleted,omitempty"` + Status string `json:"status,omitempty"` + Manifest string `json:"manifest,omitempty"` + ReadMe string `json:"readMe,omitempty"` + Name string `json:"name,omitempty"` + Version int32 `json:"version,omitempty"` +} + +func Register(schemas *types.Schemas, configMaps v1.ConfigMapClient, secrets v1.SecretClient) { + schemas.MustImportAndCustomize(HelmRelease{}, func(schema *types.Schema) { + schema.CollectionMethods = []string{http.MethodGet} + schema.ResourceMethods = []string{http.MethodGet} + schema.Store = &Store{ + configMaps: configMaps, + secrets: secrets, + } + }) +} diff --git a/pkg/resources/helmrelease/store.go b/pkg/resources/helmrelease/store.go new file mode 100644 index 0000000..d4a47eb --- /dev/null +++ b/pkg/resources/helmrelease/store.go @@ -0,0 +1,62 @@ +package helmrelease + +import ( + "github.com/rancher/norman/pkg/store/empty" + "github.com/rancher/norman/pkg/types" + v1 "github.com/rancher/wrangler-api/pkg/generated/controllers/core/v1" + "github.com/rancher/wrangler/pkg/kv" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +type Store struct { + empty.Store + configMaps v1.ConfigMapClient + secrets v1.SecretClient +} + +func (s *Store) ByID(apiOp *types.APIRequest, schema *types.Schema, id string) (types.APIObject, error) { + var ( + data string + namespace, name = kv.Split(id, ":") + ) + + secret, err := s.secrets.Get(namespace, name, metav1.GetOptions{}) + if err != nil && !errors.IsNotFound(err) { + return types.APIObject{}, err + } else if errors.IsNotFound(err) { + secret = nil + } + + if secret == nil { + configMap, err := s.configMaps.Get(apiOp.Namespaces[0], id, metav1.GetOptions{}) + if err != nil && !errors.IsNotFound(err) { + return types.APIObject{}, err + } + + if configMap == nil { + return types.APIObject{}, nil + } + + data = configMap.Data["release"] + name = configMap.Name + } else { + data = string(secret.Data["release"]) + name = secret.Name + } + + hr, err := ToRelease(data, name) + if err != nil || hr == nil { + return types.APIObject{}, err + } + + return types.ToAPI(hr), nil +} + +//func (s *Store) List(apiOp *types.APIRequest, schema *types.Schema, opt *types.QueryOptions) (types.APIObject, error) { +// +//} +// +//func (s *Store) Watch(apiOp *types.APIRequest, schema *types.Schema, w types.WatchRequest) (chan types.APIEvent, error) { +// +//} diff --git a/pkg/resources/schema.go b/pkg/resources/schema.go index 5eac5c7..56bd26d 100644 --- a/pkg/resources/schema.go +++ b/pkg/resources/schema.go @@ -2,25 +2,38 @@ package resources import ( "github.com/rancher/naok/pkg/accesscontrol" + "github.com/rancher/naok/pkg/clustercache" "github.com/rancher/naok/pkg/resources/apigroups" "github.com/rancher/naok/pkg/resources/common" + "github.com/rancher/naok/pkg/resources/core" "github.com/rancher/naok/pkg/resources/counts" + "github.com/rancher/naok/pkg/resources/helmrelease" "github.com/rancher/naok/pkg/resources/schema" "github.com/rancher/norman/pkg/store/apiroot" "github.com/rancher/norman/pkg/store/proxy" "github.com/rancher/norman/pkg/subscribe" "github.com/rancher/norman/pkg/types" + corev1controller "github.com/rancher/wrangler-api/pkg/generated/controllers/core/v1" "k8s.io/client-go/kubernetes" ) -func SchemaFactory(getter proxy.ClientGetter, as *accesscontrol.AccessStore, k8s kubernetes.Interface) *schema.Collection { +func SchemaFactory(getter proxy.ClientGetter, + as *accesscontrol.AccessStore, + k8s kubernetes.Interface, + ccache clustercache.ClusterCache, + configMaps corev1controller.ConfigMapClient, + secrets corev1controller.SecretClient, +) *schema.Collection { baseSchema := types.EmptySchemas() collection := schema.NewCollection(baseSchema, as) - counts.Register(baseSchema) + core.Register(collection) + + counts.Register(baseSchema, ccache) subscribe.Register(baseSchema) apigroups.Register(baseSchema, k8s.Discovery()) apiroot.Register(baseSchema, []string{"v1"}, []string{"proxy:/apis"}) + helmrelease.Register(baseSchema, configMaps, secrets) common.Register(collection, getter) diff --git a/pkg/resources/schema/collection.go b/pkg/resources/schema/collection.go index 91aabd2..9952e98 100644 --- a/pkg/resources/schema/collection.go +++ b/pkg/resources/schema/collection.go @@ -5,7 +5,10 @@ import ( "github.com/rancher/naok/pkg/accesscontrol" "github.com/rancher/naok/pkg/attributes" + "github.com/rancher/naok/pkg/table" + "github.com/rancher/norman/pkg/data" "github.com/rancher/norman/pkg/types" + "github.com/rancher/wrangler/pkg/name" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apiserver/pkg/authentication/user" ) @@ -21,19 +24,22 @@ type Collection struct { schemas map[string]*types.Schema templates map[string]*Template byGVR map[schema.GroupVersionResource]string + byGVK map[schema.GroupVersionKind]string as *accesscontrol.AccessStore } type Template struct { - Group string - Kind string - ID string - RegisterType interface{} - Customize func(*types.Schema) - Formatter types.Formatter - Store types.Store - Mapper types.Mapper + Group string + Kind string + ID string + RegisterType interface{} + Customize func(*types.Schema) + Formatter types.Formatter + Store types.Store + Mapper types.Mapper + Columns []table.Column + ComputedColumns func(data.Object) } func NewCollection(baseSchema *types.Schemas, access *accesscontrol.AccessStore) *Collection { @@ -42,36 +48,60 @@ func NewCollection(baseSchema *types.Schemas, access *accesscontrol.AccessStore) schemas: map[string]*types.Schema{}, templates: map[string]*Template{}, byGVR: map[schema.GroupVersionResource]string{}, + byGVK: map[schema.GroupVersionKind]string{}, as: access, } } func (c *Collection) Reset(schemas map[string]*types.Schema) { byGVR := map[schema.GroupVersionResource]string{} + byGVK := map[schema.GroupVersionKind]string{} for _, s := range schemas { gvr := attributes.GVR(s) if gvr.Resource != "" { - gvr.Resource = strings.ToLower(gvr.Resource) byGVR[gvr] = s.ID } - - kind := attributes.Kind(s) - if kind != "" { - gvr.Resource = strings.ToLower(kind) - byGVR[gvr] = s.ID + gvk := attributes.GVK(s) + if gvk.Kind != "" { + byGVK[gvk] = s.ID } } c.schemas = schemas c.byGVR = byGVR + c.byGVK = byGVK +} + +func (c *Collection) Schema(id string) *types.Schema { + return c.schemas[id] +} + +func (c *Collection) IDs() (result []string) { + seen := map[string]bool{} + for _, id := range c.byGVR { + if seen[id] { + continue + } + seen[id] = true + result = append(result, id) + } + return } func (c *Collection) ByGVR(gvr schema.GroupVersionResource) string { - gvr.Resource = strings.ToLower(gvr.Resource) + id, ok := c.byGVR[gvr] + if ok { + return id + } + gvr.Resource = name.GuessPluralName(strings.ToLower(gvr.Resource)) return c.byGVR[gvr] } +func (c *Collection) ByGVK(gvk schema.GroupVersionKind) string { + return c.byGVK[gvk] +} + func (c *Collection) AddTemplate(template *Template) { if template.Kind != "" { c.templates[template.Group+"/"+template.Kind] = template diff --git a/pkg/resources/schema/factory.go b/pkg/resources/schema/factory.go index ad57194..390c267 100644 --- a/pkg/resources/schema/factory.go +++ b/pkg/resources/schema/factory.go @@ -4,6 +4,8 @@ import ( "fmt" "net/http" + "github.com/rancher/naok/pkg/table" + "github.com/rancher/naok/pkg/accesscontrol" "github.com/rancher/naok/pkg/attributes" "github.com/rancher/norman/pkg/api/builtin" @@ -121,5 +123,8 @@ func (c *Collection) applyTemplates(schemas *types.Schemas, schema *types.Schema if t.Customize != nil { t.Customize(schema) } + if len(t.Columns) > 0 { + schemas.AddMapper(schema.ID, table.NewColumns(t.ComputedColumns, t.Columns...)) + } } } diff --git a/pkg/server/server.go b/pkg/server/server.go index 6f49017..89bfa92 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -4,6 +4,13 @@ import ( "context" "net/http" + "github.com/rancher/wrangler/pkg/generic" + schema2 "k8s.io/apimachinery/pkg/runtime/schema" + + "github.com/rancher/naok/pkg/clustercache" + + "github.com/rancher/wrangler-api/pkg/generated/controllers/core" + "github.com/rancher/naok/pkg/accesscontrol" "github.com/rancher/naok/pkg/client" "github.com/rancher/naok/pkg/controllers/schema" @@ -35,6 +42,11 @@ func Run(ctx context.Context, cfg Config) error { return err } + core, err := core.NewFactoryFromConfig(restConfig) + if err != nil { + return err + } + k8s, err := kubernetes.NewForConfig(restConfig) if err != nil { return err @@ -55,14 +67,20 @@ func Run(ctx context.Context, cfg Config) error { return err } + ccache := clustercache.NewClusterCache(ctx, cf.DynamicClient()) + sf := resources.SchemaFactory(cf, accesscontrol.NewAccessStore(rbac.Rbac().V1()), - k8s) + k8s, + ccache, + core.Core().V1().ConfigMap(), + core.Core().V1().Secret()) - schema.Register(ctx, + sync := schema.Register(ctx, k8s.Discovery(), crd.Apiextensions().V1beta1().CustomResourceDefinition(), api.Apiregistration().V1().APIService(), + ccache, sf) handler, err := publicapi.NewHandler(restConfig, sf) @@ -70,10 +88,24 @@ func Run(ctx context.Context, cfg Config) error { return err } + for _, controllers := range []controllers{api, crd, rbac} { + for gvk, controller := range controllers.Controllers() { + ccache.AddController(gvk, controller.Informer()) + } + } + if err := start.All(ctx, 5, api, crd, rbac); err != nil { return err } + if err := sync(); err != nil { + return err + } + logrus.Infof("listening on %s", cfg.ListenAddress) return http.ListenAndServe(cfg.ListenAddress, handler) } + +type controllers interface { + Controllers() map[schema2.GroupVersionKind]*generic.Controller +} diff --git a/pkg/table/mapper.go b/pkg/table/mapper.go index ec929bc..d59a888 100644 --- a/pkg/table/mapper.go +++ b/pkg/table/mapper.go @@ -23,8 +23,16 @@ type ColumnMapper struct { types.EmptyMapper } +func NewColumns(computed func(data.Object), columns ...Column) *ColumnMapper { + return &ColumnMapper{ + definition: Table{ + Columns: columns, + Computed: computed, + }, + } +} + func (t *ColumnMapper) FromInternal(d data.Object) { - d.Map("metadata").Set("columns", t.definition.Columns) if t.definition.Computed != nil { t.definition.Computed(d) }