Add states to counts

This commit is contained in:
Darren Shepherd 2020-03-12 19:14:40 -07:00
parent 4f6e4d4e16
commit d457428bff
4 changed files with 149 additions and 51 deletions

View File

@ -20,6 +20,7 @@ type Factory struct {
clientCfg *rest.Config clientCfg *rest.Config
watchClientCfg *rest.Config watchClientCfg *rest.Config
metadata metadata.Interface metadata metadata.Interface
dynamic dynamic.Interface
Config *rest.Config Config *rest.Config
} }
@ -66,7 +67,13 @@ func NewFactory(cfg *rest.Config, impersonate bool) (*Factory, error) {
return nil, err return nil, err
} }
d, err := dynamic.NewForConfig(cfg)
if err != nil {
return nil, err
}
return &Factory{ return &Factory{
dynamic: d,
metadata: md, metadata: md,
impersonate: impersonate, impersonate: impersonate,
tableClientCfg: tableClientCfg, tableClientCfg: tableClientCfg,
@ -81,6 +88,10 @@ func (p *Factory) MetadataClient() metadata.Interface {
return p.metadata return p.metadata
} }
func (p *Factory) DynamicClient() dynamic.Interface {
return p.dynamic
}
func (p *Factory) Client(ctx *types.APIRequest, s *types.APISchema, namespace string) (dynamic.ResourceInterface, error) { func (p *Factory) Client(ctx *types.APIRequest, s *types.APISchema, namespace string) (dynamic.ResourceInterface, error) {
return newClient(ctx, p.clientCfg, s, namespace, p.impersonate) return newClient(ctx, p.clientCfg, s, namespace, p.impersonate)
} }

View File

@ -2,7 +2,6 @@ package clustercache
import ( import (
"context" "context"
"fmt"
"sync" "sync"
"time" "time"
@ -15,26 +14,28 @@ import (
"k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
schema2 "k8s.io/apimachinery/pkg/runtime/schema" schema2 "k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/metadata" "k8s.io/client-go/dynamic"
"k8s.io/client-go/metadata/metadatainformer" "k8s.io/client-go/dynamic/dynamicinformer"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue" "k8s.io/client-go/util/workqueue"
) )
type Handler func(gvr schema2.GroupVersionResource, key string, obj runtime.Object) error type Handler func(gvr schema2.GroupVersionResource, key string, obj runtime.Object) error
type ChangeHandler func(gvr schema2.GroupVersionResource, key string, obj, oldObj runtime.Object) error
type ClusterCache interface { type ClusterCache interface {
List(gvr schema2.GroupVersionResource) []interface{} List(gvr schema2.GroupVersionResource) []interface{}
OnAdd(ctx context.Context, handler Handler) OnAdd(ctx context.Context, handler Handler)
OnRemove(ctx context.Context, handler Handler) OnRemove(ctx context.Context, handler Handler)
OnChange(ctx context.Context, handler Handler) OnChange(ctx context.Context, handler ChangeHandler)
OnSchemas(schemas *schema.Collection) error OnSchemas(schemas *schema.Collection) error
} }
type event struct { type event struct {
add bool add bool
gvr schema2.GroupVersionResource gvr schema2.GroupVersionResource
obj runtime.Object obj runtime.Object
oldObj runtime.Object
} }
type watcher struct { type watcher struct {
@ -51,7 +52,7 @@ type clusterCache struct {
ctx context.Context ctx context.Context
typed map[schema2.GroupVersionKind]cache.SharedIndexInformer typed map[schema2.GroupVersionKind]cache.SharedIndexInformer
informerFactory metadatainformer.SharedInformerFactory informerFactory dynamicinformer.DynamicSharedInformerFactory
controllerFactory generic.ControllerManager controllerFactory generic.ControllerManager
watchers map[schema2.GroupVersionResource]*watcher watchers map[schema2.GroupVersionResource]*watcher
workqueue workqueue.DelayingInterface workqueue workqueue.DelayingInterface
@ -61,11 +62,11 @@ type clusterCache struct {
changeHandlers cancelCollection changeHandlers cancelCollection
} }
func NewClusterCache(ctx context.Context, client metadata.Interface) ClusterCache { func NewClusterCache(ctx context.Context, client dynamic.Interface) ClusterCache {
c := &clusterCache{ c := &clusterCache{
ctx: ctx, ctx: ctx,
typed: map[schema2.GroupVersionKind]cache.SharedIndexInformer{}, typed: map[schema2.GroupVersionKind]cache.SharedIndexInformer{},
informerFactory: metadatainformer.NewSharedInformerFactory(client, 2*time.Hour), informerFactory: dynamicinformer.NewDynamicSharedInformerFactory(client, 2*time.Hour),
watchers: map[schema2.GroupVersionResource]*watcher{}, watchers: map[schema2.GroupVersionResource]*watcher{},
workqueue: workqueue.NewNamedDelayingQueue("cluster-cache"), workqueue: workqueue.NewNamedDelayingQueue("cluster-cache"),
} }
@ -103,6 +104,17 @@ func (h *clusterCache) addResourceEventHandler(gvr schema2.GroupVersionResource,
}) })
} }
}, },
UpdateFunc: func(oldObj, newObj interface{}) {
if rObj, ok := newObj.(runtime.Object); ok {
if rOldObj, ok := oldObj.(runtime.Object); ok {
h.workqueue.Add(event{
obj: rObj,
oldObj: rOldObj,
gvr: gvr,
})
}
}
},
DeleteFunc: func(obj interface{}) { DeleteFunc: func(obj interface{}) {
if rObj, ok := obj.(runtime.Object); ok { if rObj, ok := obj.(runtime.Object); ok {
h.workqueue.Add(event{ h.workqueue.Add(event{
@ -155,10 +167,6 @@ func (h *clusterCache) OnSchemas(schemas *schema.Collection) error {
logrus.Infof("Watching metadata for %s", gvk.String()) logrus.Infof("Watching metadata for %s", gvk.String())
h.addResourceEventHandler(gvr, w.informer) h.addResourceEventHandler(gvr, w.informer)
name := fmt.Sprintf("meta %s", gvk)
h.controllerFactory.AddHandler(ctx, gvk, w.informer, name, func(key string, obj runtime.Object) (object runtime.Object, e error) {
return callAll(h.changeHandlers.List(), gvr, key, obj)
})
} }
for gvr, w := range h.watchers { for gvr, w := range h.watchers {
@ -219,13 +227,18 @@ func (h *clusterCache) start() {
} }
key := toKey(event.obj) key := toKey(event.obj)
if event.add { if event.oldObj != nil {
_, err := callAll(h.addHandlers.List(), event.gvr, key, event.obj) _, err := callAll(h.addHandlers.List(), event.gvr, key, event.obj, event.oldObj)
if err != nil {
logrus.Errorf("failed to handle add event: %v", err)
}
} else if event.add {
_, err := callAll(h.addHandlers.List(), event.gvr, key, event.obj, nil)
if err != nil { if err != nil {
logrus.Errorf("failed to handle add event: %v", err) logrus.Errorf("failed to handle add event: %v", err)
} }
} else { } else {
_, err := callAll(h.removeHandlers.List(), event.gvr, key, event.obj) _, err := callAll(h.removeHandlers.List(), event.gvr, key, event.obj, nil)
if err != nil { if err != nil {
logrus.Errorf("failed to handle remove event: %v", err) logrus.Errorf("failed to handle remove event: %v", err)
} }
@ -253,16 +266,22 @@ func (h *clusterCache) OnRemove(ctx context.Context, handler Handler) {
h.removeHandlers.Add(ctx, handler) h.removeHandlers.Add(ctx, handler)
} }
func (h *clusterCache) OnChange(ctx context.Context, handler Handler) { func (h *clusterCache) OnChange(ctx context.Context, handler ChangeHandler) {
h.changeHandlers.Add(ctx, handler) h.changeHandlers.Add(ctx, handler)
} }
func callAll(handlers []interface{}, gvr schema2.GroupVersionResource, key string, obj runtime.Object) (runtime.Object, error) { func callAll(handlers []interface{}, gvr schema2.GroupVersionResource, key string, obj, oldObj runtime.Object) (runtime.Object, error) {
var errs []error var errs []error
for _, handler := range handlers { for _, handler := range handlers {
f := handler.(Handler) if f, ok := handler.(Handler); ok {
if err := f(gvr, key, obj); err != nil { if err := f(gvr, key, obj); err != nil {
errs = append(errs, err) errs = append(errs, err)
}
}
if f, ok := handler.(ChangeHandler); ok {
if err := f(gvr, key, obj, oldObj); err != nil {
errs = append(errs, err)
}
} }
} }

View File

@ -10,7 +10,9 @@ import (
"github.com/rancher/steve/pkg/clustercache" "github.com/rancher/steve/pkg/clustercache"
"github.com/rancher/steve/pkg/schemaserver/store/empty" "github.com/rancher/steve/pkg/schemaserver/store/empty"
"github.com/rancher/steve/pkg/schemaserver/types" "github.com/rancher/steve/pkg/schemaserver/types"
"github.com/rancher/wrangler/pkg/summary"
"k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
schema2 "k8s.io/apimachinery/pkg/runtime/schema" schema2 "k8s.io/apimachinery/pkg/runtime/schema"
) )
@ -46,10 +48,17 @@ type Count struct {
Counts map[string]ItemCount `json:"counts"` Counts map[string]ItemCount `json:"counts"`
} }
type Summary struct {
Count int `json:"count,omitempty"`
States map[string]int `json:"states,omitempty"`
Error int `json:"errors,omitempty"`
Transitioning int `json:"transitioning,omitempty"`
}
type ItemCount struct { type ItemCount struct {
Count int `json:"count,omitempty"` Summary Summary `json:"summary,omitempty"`
Namespaces map[string]int `json:"namespaces,omitempty"` Namespaces map[string]Summary `json:"namespaces,omitempty"`
Revision int `json:"revision,omitempty"` Revision int `json:"revision,omitempty"`
} }
type Store struct { type Store struct {
@ -105,7 +114,7 @@ func (s *Store) Watch(apiOp *types.APIRequest, schema *types.APISchema, w types.
countLock.Unlock() countLock.Unlock()
}() }()
onChange := func(add bool, gvr schema2.GroupVersionResource, _ string, obj runtime.Object) error { onChange := func(add bool, gvr schema2.GroupVersionResource, _ string, obj, oldObj runtime.Object) error {
countLock.Lock() countLock.Lock()
defer countLock.Unlock() defer countLock.Unlock()
@ -118,7 +127,7 @@ func (s *Store) Watch(apiOp *types.APIRequest, schema *types.APISchema, w types.
return nil return nil
} }
_, namespace, revision, ok := getInfo(obj) _, namespace, revision, summary, ok := getInfo(obj)
if !ok { if !ok {
return nil return nil
} }
@ -128,27 +137,32 @@ func (s *Store) Watch(apiOp *types.APIRequest, schema *types.APISchema, w types.
return nil return nil
} }
if add { if oldObj != nil {
itemCount.Count++ if _, _, _, oldSummary, ok := getInfo(oldObj); ok {
if namespace != "" { if oldSummary.Transitioning == summary.Transitioning &&
itemCount.Namespaces[namespace]++ oldSummary.Error == summary.Error &&
oldSummary.State == summary.State {
return nil
}
itemCount = removeCounts(itemCount, namespace, oldSummary)
} else {
return nil
} }
} else if add {
itemCount = addCounts(itemCount, namespace, summary)
} else { } else {
itemCount.Count-- itemCount = removeCounts(itemCount, namespace, summary)
if namespace != "" {
itemCount.Namespaces[namespace]--
}
} }
counts[schema.ID] = itemCount counts[schema.ID] = itemCount
countsCopy := map[string]ItemCount{} countsCopy := map[string]ItemCount{}
for k, v := range counts { for k, v := range counts {
ns := map[string]int{} ns := map[string]Summary{}
for i, j := range v.Namespaces { for i, j := range v.Namespaces {
ns[i] = j ns[i] = j
} }
countsCopy[k] = ItemCount{ countsCopy[k] = ItemCount{
Count: v.Count, Summary: v.Summary,
Revision: v.Revision, Revision: v.Revision,
Namespaces: ns, Namespaces: ns,
} }
@ -167,10 +181,13 @@ func (s *Store) Watch(apiOp *types.APIRequest, schema *types.APISchema, w types.
} }
s.ccache.OnAdd(apiOp.Context(), func(gvr schema2.GroupVersionResource, key string, obj runtime.Object) error { s.ccache.OnAdd(apiOp.Context(), func(gvr schema2.GroupVersionResource, key string, obj runtime.Object) error {
return onChange(true, gvr, key, obj) return onChange(true, gvr, key, obj, nil)
})
s.ccache.OnChange(apiOp.Context(), func(gvr schema2.GroupVersionResource, key string, obj, oldObj runtime.Object) error {
return onChange(true, gvr, key, obj, oldObj)
}) })
s.ccache.OnRemove(apiOp.Context(), func(gvr schema2.GroupVersionResource, key string, obj runtime.Object) error { s.ccache.OnRemove(apiOp.Context(), func(gvr schema2.GroupVersionResource, key string, obj runtime.Object) error {
return onChange(false, gvr, key, obj) return onChange(false, gvr, key, obj, nil)
}) })
return result, nil return result, nil
@ -200,23 +217,77 @@ func (s *Store) schemasToWatch(apiOp *types.APIRequest) (result []*types.APISche
return return
} }
func getInfo(obj interface{}) (name string, namespace string, revision int, ok bool) { func getInfo(obj interface{}) (name string, namespace string, revision int, summaryResult summary.Summary, ok bool) {
r, ok := obj.(runtime.Object) r, ok := obj.(runtime.Object)
if !ok { if !ok {
return "", "", 0, false return "", "", 0, summaryResult, false
} }
meta, err := meta.Accessor(r) meta, err := meta.Accessor(r)
if err != nil { if err != nil {
return "", "", 0, false return "", "", 0, summaryResult, false
} }
revision, err = strconv.Atoi(meta.GetResourceVersion()) revision, err = strconv.Atoi(meta.GetResourceVersion())
if err != nil { if err != nil {
return "", "", 0, false return "", "", 0, summaryResult, false
} }
return meta.GetName(), meta.GetNamespace(), revision, true if unstr, ok := obj.(*unstructured.Unstructured); ok {
summaryResult = summary.Summarize(unstr)
}
return meta.GetName(), meta.GetNamespace(), revision, summaryResult, true
}
func removeCounts(itemCount ItemCount, ns string, summary summary.Summary) ItemCount {
itemCount.Summary = removeSummary(itemCount.Summary, summary)
if ns == "" {
itemCount.Namespaces[ns] = removeSummary(itemCount.Namespaces[ns], summary)
}
return itemCount
}
func addCounts(itemCount ItemCount, ns string, summary summary.Summary) ItemCount {
itemCount.Summary = addSummary(itemCount.Summary, summary)
if ns == "" {
itemCount.Namespaces[ns] = addSummary(itemCount.Namespaces[ns], summary)
}
return itemCount
}
func removeSummary(counts Summary, summary summary.Summary) Summary {
counts.Count--
if summary.Transitioning {
counts.Transitioning--
}
if summary.Error {
counts.Error--
}
if summary.State != "" {
if counts.States == nil {
counts.States = map[string]int{}
}
counts.States[summary.State] -= 1
}
return counts
}
func addSummary(counts Summary, summary summary.Summary) Summary {
counts.Count++
if summary.Transitioning {
counts.Transitioning++
}
if summary.Error {
counts.Error++
}
if summary.State != "" {
if counts.States == nil {
counts.States = map[string]int{}
}
counts.States[summary.State] += 1
}
return counts
} }
func (s *Store) getCount(apiOp *types.APIRequest) Count { func (s *Store) getCount(apiOp *types.APIRequest) Count {
@ -228,13 +299,13 @@ func (s *Store) getCount(apiOp *types.APIRequest) Count {
rev := 0 rev := 0
itemCount := ItemCount{ itemCount := ItemCount{
Namespaces: map[string]int{}, Namespaces: map[string]Summary{},
} }
all := access.Grants("list", "*", "*") all := access.Grants("list", "*", "*")
for _, obj := range s.ccache.List(gvr) { for _, obj := range s.ccache.List(gvr) {
name, ns, revision, ok := getInfo(obj) name, ns, revision, summary, ok := getInfo(obj)
if !ok { if !ok {
continue continue
} }
@ -247,10 +318,7 @@ func (s *Store) getCount(apiOp *types.APIRequest) Count {
rev = revision rev = revision
} }
itemCount.Count++ itemCount = addCounts(itemCount, ns, summary)
if ns != "" {
itemCount.Namespaces[ns]++
}
} }
itemCount.Revision = rev itemCount.Revision = rev

View File

@ -66,7 +66,7 @@ func setup(ctx context.Context, server *Server) (http.Handler, *schema.Collectio
asl = accesscontrol.NewAccessStore(ctx, true, server.RBAC) asl = accesscontrol.NewAccessStore(ctx, true, server.RBAC)
} }
ccache := clustercache.NewClusterCache(ctx, cf.MetadataClient()) ccache := clustercache.NewClusterCache(ctx, cf.DynamicClient())
server.BaseSchemas = resources.DefaultSchemas(server.BaseSchemas, ccache, cf) server.BaseSchemas = resources.DefaultSchemas(server.BaseSchemas, ccache, cf)
server.SchemaTemplates = append(server.SchemaTemplates, resources.DefaultSchemaTemplates(cf, asl, server.K8s.Discovery())...) server.SchemaTemplates = append(server.SchemaTemplates, resources.DefaultSchemaTemplates(cf, asl, server.K8s.Discovery())...)