From d457428bfff9c4b7dfffc1a9ad38a770ca82d184 Mon Sep 17 00:00:00 2001 From: Darren Shepherd Date: Thu, 12 Mar 2020 19:14:40 -0700 Subject: [PATCH] Add states to counts --- pkg/client/factory.go | 11 +++ pkg/clustercache/controller.go | 63 ++++++++----- pkg/server/resources/counts/counts.go | 124 ++++++++++++++++++++------ pkg/server/server.go | 2 +- 4 files changed, 149 insertions(+), 51 deletions(-) diff --git a/pkg/client/factory.go b/pkg/client/factory.go index ae4a06a..650bfd7 100644 --- a/pkg/client/factory.go +++ b/pkg/client/factory.go @@ -20,6 +20,7 @@ type Factory struct { clientCfg *rest.Config watchClientCfg *rest.Config metadata metadata.Interface + dynamic dynamic.Interface Config *rest.Config } @@ -66,7 +67,13 @@ func NewFactory(cfg *rest.Config, impersonate bool) (*Factory, error) { return nil, err } + d, err := dynamic.NewForConfig(cfg) + if err != nil { + return nil, err + } + return &Factory{ + dynamic: d, metadata: md, impersonate: impersonate, tableClientCfg: tableClientCfg, @@ -81,6 +88,10 @@ func (p *Factory) MetadataClient() metadata.Interface { return p.metadata } +func (p *Factory) DynamicClient() dynamic.Interface { + return p.dynamic +} + func (p *Factory) Client(ctx *types.APIRequest, s *types.APISchema, namespace string) (dynamic.ResourceInterface, error) { return newClient(ctx, p.clientCfg, s, namespace, p.impersonate) } diff --git a/pkg/clustercache/controller.go b/pkg/clustercache/controller.go index b082b79..0e25abf 100644 --- a/pkg/clustercache/controller.go +++ b/pkg/clustercache/controller.go @@ -2,7 +2,6 @@ package clustercache import ( "context" - "fmt" "sync" "time" @@ -15,26 +14,28 @@ import ( "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/runtime" schema2 "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/client-go/metadata" - "k8s.io/client-go/metadata/metadatainformer" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/dynamic/dynamicinformer" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" ) type Handler func(gvr schema2.GroupVersionResource, key string, obj runtime.Object) error +type ChangeHandler func(gvr schema2.GroupVersionResource, key string, obj, oldObj runtime.Object) error type ClusterCache interface { List(gvr schema2.GroupVersionResource) []interface{} OnAdd(ctx context.Context, handler Handler) OnRemove(ctx context.Context, handler Handler) - OnChange(ctx context.Context, handler Handler) + OnChange(ctx context.Context, handler ChangeHandler) OnSchemas(schemas *schema.Collection) error } type event struct { - add bool - gvr schema2.GroupVersionResource - obj runtime.Object + add bool + gvr schema2.GroupVersionResource + obj runtime.Object + oldObj runtime.Object } type watcher struct { @@ -51,7 +52,7 @@ type clusterCache struct { ctx context.Context typed map[schema2.GroupVersionKind]cache.SharedIndexInformer - informerFactory metadatainformer.SharedInformerFactory + informerFactory dynamicinformer.DynamicSharedInformerFactory controllerFactory generic.ControllerManager watchers map[schema2.GroupVersionResource]*watcher workqueue workqueue.DelayingInterface @@ -61,11 +62,11 @@ type clusterCache struct { changeHandlers cancelCollection } -func NewClusterCache(ctx context.Context, client metadata.Interface) ClusterCache { +func NewClusterCache(ctx context.Context, client dynamic.Interface) ClusterCache { c := &clusterCache{ ctx: ctx, typed: map[schema2.GroupVersionKind]cache.SharedIndexInformer{}, - informerFactory: metadatainformer.NewSharedInformerFactory(client, 2*time.Hour), + informerFactory: dynamicinformer.NewDynamicSharedInformerFactory(client, 2*time.Hour), watchers: map[schema2.GroupVersionResource]*watcher{}, workqueue: workqueue.NewNamedDelayingQueue("cluster-cache"), } @@ -103,6 +104,17 @@ func (h *clusterCache) addResourceEventHandler(gvr schema2.GroupVersionResource, }) } }, + UpdateFunc: func(oldObj, newObj interface{}) { + if rObj, ok := newObj.(runtime.Object); ok { + if rOldObj, ok := oldObj.(runtime.Object); ok { + h.workqueue.Add(event{ + obj: rObj, + oldObj: rOldObj, + gvr: gvr, + }) + } + } + }, DeleteFunc: func(obj interface{}) { if rObj, ok := obj.(runtime.Object); ok { h.workqueue.Add(event{ @@ -155,10 +167,6 @@ func (h *clusterCache) OnSchemas(schemas *schema.Collection) error { logrus.Infof("Watching metadata for %s", gvk.String()) h.addResourceEventHandler(gvr, w.informer) - name := fmt.Sprintf("meta %s", gvk) - h.controllerFactory.AddHandler(ctx, gvk, w.informer, name, func(key string, obj runtime.Object) (object runtime.Object, e error) { - return callAll(h.changeHandlers.List(), gvr, key, obj) - }) } for gvr, w := range h.watchers { @@ -219,13 +227,18 @@ func (h *clusterCache) start() { } key := toKey(event.obj) - if event.add { - _, err := callAll(h.addHandlers.List(), event.gvr, key, event.obj) + if event.oldObj != nil { + _, err := callAll(h.addHandlers.List(), event.gvr, key, event.obj, event.oldObj) + if err != nil { + logrus.Errorf("failed to handle add event: %v", err) + } + } else if event.add { + _, err := callAll(h.addHandlers.List(), event.gvr, key, event.obj, nil) if err != nil { logrus.Errorf("failed to handle add event: %v", err) } } else { - _, err := callAll(h.removeHandlers.List(), event.gvr, key, event.obj) + _, err := callAll(h.removeHandlers.List(), event.gvr, key, event.obj, nil) if err != nil { logrus.Errorf("failed to handle remove event: %v", err) } @@ -253,16 +266,22 @@ func (h *clusterCache) OnRemove(ctx context.Context, handler Handler) { h.removeHandlers.Add(ctx, handler) } -func (h *clusterCache) OnChange(ctx context.Context, handler Handler) { +func (h *clusterCache) OnChange(ctx context.Context, handler ChangeHandler) { h.changeHandlers.Add(ctx, handler) } -func callAll(handlers []interface{}, gvr schema2.GroupVersionResource, key string, obj runtime.Object) (runtime.Object, error) { +func callAll(handlers []interface{}, gvr schema2.GroupVersionResource, key string, obj, oldObj runtime.Object) (runtime.Object, error) { var errs []error for _, handler := range handlers { - f := handler.(Handler) - if err := f(gvr, key, obj); err != nil { - errs = append(errs, err) + if f, ok := handler.(Handler); ok { + if err := f(gvr, key, obj); err != nil { + errs = append(errs, err) + } + } + if f, ok := handler.(ChangeHandler); ok { + if err := f(gvr, key, obj, oldObj); err != nil { + errs = append(errs, err) + } } } diff --git a/pkg/server/resources/counts/counts.go b/pkg/server/resources/counts/counts.go index 9fa7137..815e91f 100644 --- a/pkg/server/resources/counts/counts.go +++ b/pkg/server/resources/counts/counts.go @@ -10,7 +10,9 @@ import ( "github.com/rancher/steve/pkg/clustercache" "github.com/rancher/steve/pkg/schemaserver/store/empty" "github.com/rancher/steve/pkg/schemaserver/types" + "github.com/rancher/wrangler/pkg/summary" "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" schema2 "k8s.io/apimachinery/pkg/runtime/schema" ) @@ -46,10 +48,17 @@ type Count struct { Counts map[string]ItemCount `json:"counts"` } +type Summary struct { + Count int `json:"count,omitempty"` + States map[string]int `json:"states,omitempty"` + Error int `json:"errors,omitempty"` + Transitioning int `json:"transitioning,omitempty"` +} + type ItemCount struct { - Count int `json:"count,omitempty"` - Namespaces map[string]int `json:"namespaces,omitempty"` - Revision int `json:"revision,omitempty"` + Summary Summary `json:"summary,omitempty"` + Namespaces map[string]Summary `json:"namespaces,omitempty"` + Revision int `json:"revision,omitempty"` } type Store struct { @@ -105,7 +114,7 @@ func (s *Store) Watch(apiOp *types.APIRequest, schema *types.APISchema, w types. countLock.Unlock() }() - onChange := func(add bool, gvr schema2.GroupVersionResource, _ string, obj runtime.Object) error { + onChange := func(add bool, gvr schema2.GroupVersionResource, _ string, obj, oldObj runtime.Object) error { countLock.Lock() defer countLock.Unlock() @@ -118,7 +127,7 @@ func (s *Store) Watch(apiOp *types.APIRequest, schema *types.APISchema, w types. return nil } - _, namespace, revision, ok := getInfo(obj) + _, namespace, revision, summary, ok := getInfo(obj) if !ok { return nil } @@ -128,27 +137,32 @@ func (s *Store) Watch(apiOp *types.APIRequest, schema *types.APISchema, w types. return nil } - if add { - itemCount.Count++ - if namespace != "" { - itemCount.Namespaces[namespace]++ + if oldObj != nil { + if _, _, _, oldSummary, ok := getInfo(oldObj); ok { + if oldSummary.Transitioning == summary.Transitioning && + oldSummary.Error == summary.Error && + oldSummary.State == summary.State { + return nil + } + itemCount = removeCounts(itemCount, namespace, oldSummary) + } else { + return nil } + } else if add { + itemCount = addCounts(itemCount, namespace, summary) } else { - itemCount.Count-- - if namespace != "" { - itemCount.Namespaces[namespace]-- - } + itemCount = removeCounts(itemCount, namespace, summary) } counts[schema.ID] = itemCount countsCopy := map[string]ItemCount{} for k, v := range counts { - ns := map[string]int{} + ns := map[string]Summary{} for i, j := range v.Namespaces { ns[i] = j } countsCopy[k] = ItemCount{ - Count: v.Count, + Summary: v.Summary, Revision: v.Revision, Namespaces: ns, } @@ -167,10 +181,13 @@ func (s *Store) Watch(apiOp *types.APIRequest, schema *types.APISchema, w types. } s.ccache.OnAdd(apiOp.Context(), func(gvr schema2.GroupVersionResource, key string, obj runtime.Object) error { - return onChange(true, gvr, key, obj) + return onChange(true, gvr, key, obj, nil) + }) + s.ccache.OnChange(apiOp.Context(), func(gvr schema2.GroupVersionResource, key string, obj, oldObj runtime.Object) error { + return onChange(true, gvr, key, obj, oldObj) }) s.ccache.OnRemove(apiOp.Context(), func(gvr schema2.GroupVersionResource, key string, obj runtime.Object) error { - return onChange(false, gvr, key, obj) + return onChange(false, gvr, key, obj, nil) }) return result, nil @@ -200,23 +217,77 @@ func (s *Store) schemasToWatch(apiOp *types.APIRequest) (result []*types.APISche return } -func getInfo(obj interface{}) (name string, namespace string, revision int, ok bool) { +func getInfo(obj interface{}) (name string, namespace string, revision int, summaryResult summary.Summary, ok bool) { r, ok := obj.(runtime.Object) if !ok { - return "", "", 0, false + return "", "", 0, summaryResult, false } meta, err := meta.Accessor(r) if err != nil { - return "", "", 0, false + return "", "", 0, summaryResult, false } revision, err = strconv.Atoi(meta.GetResourceVersion()) if err != nil { - return "", "", 0, false + return "", "", 0, summaryResult, false } - return meta.GetName(), meta.GetNamespace(), revision, true + if unstr, ok := obj.(*unstructured.Unstructured); ok { + summaryResult = summary.Summarize(unstr) + } + + return meta.GetName(), meta.GetNamespace(), revision, summaryResult, true +} + +func removeCounts(itemCount ItemCount, ns string, summary summary.Summary) ItemCount { + itemCount.Summary = removeSummary(itemCount.Summary, summary) + if ns == "" { + itemCount.Namespaces[ns] = removeSummary(itemCount.Namespaces[ns], summary) + } + return itemCount +} + +func addCounts(itemCount ItemCount, ns string, summary summary.Summary) ItemCount { + itemCount.Summary = addSummary(itemCount.Summary, summary) + if ns == "" { + itemCount.Namespaces[ns] = addSummary(itemCount.Namespaces[ns], summary) + } + return itemCount +} + +func removeSummary(counts Summary, summary summary.Summary) Summary { + counts.Count-- + if summary.Transitioning { + counts.Transitioning-- + } + if summary.Error { + counts.Error-- + } + if summary.State != "" { + if counts.States == nil { + counts.States = map[string]int{} + } + counts.States[summary.State] -= 1 + } + return counts +} + +func addSummary(counts Summary, summary summary.Summary) Summary { + counts.Count++ + if summary.Transitioning { + counts.Transitioning++ + } + if summary.Error { + counts.Error++ + } + if summary.State != "" { + if counts.States == nil { + counts.States = map[string]int{} + } + counts.States[summary.State] += 1 + } + return counts } func (s *Store) getCount(apiOp *types.APIRequest) Count { @@ -228,13 +299,13 @@ func (s *Store) getCount(apiOp *types.APIRequest) Count { rev := 0 itemCount := ItemCount{ - Namespaces: map[string]int{}, + Namespaces: map[string]Summary{}, } all := access.Grants("list", "*", "*") for _, obj := range s.ccache.List(gvr) { - name, ns, revision, ok := getInfo(obj) + name, ns, revision, summary, ok := getInfo(obj) if !ok { continue } @@ -247,10 +318,7 @@ func (s *Store) getCount(apiOp *types.APIRequest) Count { rev = revision } - itemCount.Count++ - if ns != "" { - itemCount.Namespaces[ns]++ - } + itemCount = addCounts(itemCount, ns, summary) } itemCount.Revision = rev diff --git a/pkg/server/server.go b/pkg/server/server.go index 88449de..a57adc6 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -66,7 +66,7 @@ func setup(ctx context.Context, server *Server) (http.Handler, *schema.Collectio asl = accesscontrol.NewAccessStore(ctx, true, server.RBAC) } - ccache := clustercache.NewClusterCache(ctx, cf.MetadataClient()) + ccache := clustercache.NewClusterCache(ctx, cf.DynamicClient()) server.BaseSchemas = resources.DefaultSchemas(server.BaseSchemas, ccache, cf) server.SchemaTemplates = append(server.SchemaTemplates, resources.DefaultSchemaTemplates(cf, asl, server.K8s.Discovery())...)