diff --git a/pkg/resources/common/formatter.go b/pkg/resources/common/formatter.go index d702284..39c7aef 100644 --- a/pkg/resources/common/formatter.go +++ b/pkg/resources/common/formatter.go @@ -7,48 +7,49 @@ import ( "github.com/rancher/steve/pkg/accesscontrol" "github.com/rancher/steve/pkg/schema" "github.com/rancher/steve/pkg/stores/proxy" + "github.com/rancher/steve/pkg/summarycache" "github.com/rancher/wrangler/pkg/data" - "github.com/rancher/wrangler/pkg/summary" "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" ) -func DefaultTemplate(clientGetter proxy.ClientGetter, asl accesscontrol.AccessSetLookup) schema.Template { +func DefaultTemplate(clientGetter proxy.ClientGetter, + summaryCache *summarycache.SummaryCache, + asl accesscontrol.AccessSetLookup) schema.Template { return schema.Template{ - Store: proxy.NewProxyStore(clientGetter, asl), - Formatter: Formatter, + Store: proxy.NewProxyStore(clientGetter, summaryCache, asl), + Formatter: formatter(summaryCache), } } -func DefaultFormatter(next types.Formatter) types.Formatter { - return types.FormatterChain(Formatter, next) -} +func formatter(summarycache *summarycache.SummaryCache) types.Formatter { + return func(request *types.APIRequest, resource *types.RawResource) { + meta, err := meta.Accessor(resource.APIObject.Object) + if err != nil { + return + } -func Formatter(request *types.APIRequest, resource *types.RawResource) { - meta, err := meta.Accessor(resource.APIObject.Object) - if err != nil { - return - } + selfLink := meta.GetSelfLink() + if selfLink == "" { + return + } - selfLink := meta.GetSelfLink() - if selfLink == "" { - return - } + u := request.URLBuilder.RelativeToRoot(selfLink) + resource.Links["view"] = u - u := request.URLBuilder.RelativeToRoot(selfLink) - resource.Links["view"] = u + if _, ok := resource.Links["update"]; !ok { + resource.Links["update"] = u + } - if _, ok := resource.Links["update"]; !ok { - resource.Links["update"] = u - } - - if unstr, ok := resource.APIObject.Object.(*unstructured.Unstructured); ok { - summary := summary.Summarize(unstr) - data.PutValue(unstr.Object, map[string]interface{}{ - "name": summary.State, - "error": summary.Error, - "transitioning": summary.Transitioning, - "message": strings.Join(summary.Message, ":"), - }, "metadata", "state") + if unstr, ok := resource.APIObject.Object.(*unstructured.Unstructured); ok { + summary, rel := summarycache.SummaryAndRelationship(unstr) + data.PutValue(unstr.Object, map[string]interface{}{ + "name": summary.State, + "error": summary.Error, + "transitioning": summary.Transitioning, + "message": strings.Join(summary.Message, ":"), + }, "metadata", "state") + data.PutValue(unstr.Object, rel, "metadata", "relationships") + } } } diff --git a/pkg/resources/schema.go b/pkg/resources/schema.go index 355c000..e1e2af3 100644 --- a/pkg/resources/schema.go +++ b/pkg/resources/schema.go @@ -3,6 +3,8 @@ package resources import ( "context" + "github.com/rancher/steve/pkg/summarycache" + "github.com/rancher/apiserver/pkg/store/apiroot" "github.com/rancher/apiserver/pkg/subscribe" "github.com/rancher/apiserver/pkg/types" @@ -31,17 +33,20 @@ func DefaultSchemas(ctx context.Context, baseSchema *types.APISchemas, ccache cl return baseSchema, err } -func DefaultSchemaTemplates(cf *client.Factory, lookup accesscontrol.AccessSetLookup, discovery discovery.DiscoveryInterface) []schema.Template { +func DefaultSchemaTemplates(cf *client.Factory, + summaryCache *summarycache.SummaryCache, + lookup accesscontrol.AccessSetLookup, + discovery discovery.DiscoveryInterface) []schema.Template { return []schema.Template{ - common.DefaultTemplate(cf, lookup), + common.DefaultTemplate(cf, summaryCache, lookup), apigroups.Template(discovery), { ID: "configmap", - Formatter: common.DefaultFormatter(helm.DropHelmData), + Formatter: helm.DropHelmData, }, { ID: "secret", - Formatter: common.DefaultFormatter(helm.DropHelmData), + Formatter: helm.DropHelmData, }, } } diff --git a/pkg/schema/factory.go b/pkg/schema/factory.go index 24d6f9d..6c25e9e 100644 --- a/pkg/schema/factory.go +++ b/pkg/schema/factory.go @@ -138,6 +138,8 @@ func (c *Collection) applyTemplates(schema *types.APISchema) { } if schema.Formatter == nil { schema.Formatter = t.Formatter + } else if t.Formatter != nil { + schema.Formatter = types.FormatterChain(t.Formatter, schema.Formatter) } if schema.Store == nil { if t.StoreFactory == nil { diff --git a/pkg/server/server.go b/pkg/server/server.go index bfb0e97..9d749e9 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -17,6 +17,7 @@ import ( "github.com/rancher/steve/pkg/resources/schemas" "github.com/rancher/steve/pkg/schema" "github.com/rancher/steve/pkg/server/handler" + "github.com/rancher/steve/pkg/summarycache" ) var ErrConfigRequired = errors.New("rest config is required") @@ -72,15 +73,19 @@ func setup(ctx context.Context, server *Server) (http.Handler, *schema.Collectio return nil, nil, err } - server.SchemaTemplates = append(server.SchemaTemplates, resources.DefaultSchemaTemplates(cf, asl, server.K8s.Discovery())...) + sf := schema.NewCollection(ctx, server.BaseSchemas, asl) + summaryCache := summarycache.New(sf) + ccache.OnAdd(ctx, summaryCache.OnAdd) + ccache.OnRemove(ctx, summaryCache.OnRemove) + ccache.OnChange(ctx, summaryCache.OnChange) + + server.SchemaTemplates = append(server.SchemaTemplates, resources.DefaultSchemaTemplates(cf, summaryCache, asl, server.K8s.Discovery())...) cols, err := common.NewDynamicColumns(server.RestConfig) if err != nil { return nil, nil, err } - sf := schema.NewCollection(ctx, server.BaseSchemas, asl) - schemas.SetupWatcher(ctx, server.BaseSchemas, asl, sf) sync := schemacontroller.Register(ctx, diff --git a/pkg/stores/proxy/proxy_store.go b/pkg/stores/proxy/proxy_store.go index babbf43..1ad337f 100644 --- a/pkg/stores/proxy/proxy_store.go +++ b/pkg/stores/proxy/proxy_store.go @@ -1,6 +1,7 @@ package proxy import ( + "context" "encoding/json" "fmt" "io" @@ -16,7 +17,9 @@ import ( "github.com/rancher/steve/pkg/stores/partition" "github.com/rancher/wrangler/pkg/data" "github.com/rancher/wrangler/pkg/schemas/validation" + "github.com/rancher/wrangler/pkg/summary" "github.com/sirupsen/logrus" + "golang.org/x/sync/errgroup" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" @@ -44,17 +47,23 @@ type ClientGetter interface { TableAdminClientForWatch(ctx *types.APIRequest, schema *types.APISchema, namespace string) (dynamic.ResourceInterface, error) } -type Store struct { - clientGetter ClientGetter +type RelationshipNotifier interface { + OnInboundRelationshipChange(ctx context.Context, schema *types.APISchema, namespace string) <-chan *summary.Relationship } -func NewProxyStore(clientGetter ClientGetter, lookup accesscontrol.AccessSetLookup) types.Store { +type Store struct { + clientGetter ClientGetter + notifier RelationshipNotifier +} + +func NewProxyStore(clientGetter ClientGetter, notifier RelationshipNotifier, lookup accesscontrol.AccessSetLookup) types.Store { return &errorStore{ Store: &WatchRefresh{ Store: &partition.Store{ Partitioner: &rbacPartitioner{ proxyStore: &Store{ clientGetter: clientGetter, + notifier: notifier, }, }, }, @@ -286,17 +295,35 @@ func (s *Store) listAndWatch(apiOp *types.APIRequest, k8sClient dynamic.Resource defer watcher.Stop() logrus.Debugf("opening watcher for %s", schema.ID) + eg, ctx := errgroup.WithContext(apiOp.Context()) + go func() { - <-apiOp.Request.Context().Done() + <-ctx.Done() watcher.Stop() }() - for event := range watcher.ResultChan() { - if event.Type == watch.Error { - continue + eg.Go(func() error { + for rel := range s.notifier.OnInboundRelationshipChange(ctx, schema, apiOp.Namespace) { + obj, err := s.byID(apiOp, schema, rel.Name) + if err == nil { + result <- s.toAPIEvent(apiOp, schema, watch.Modified, obj) + } } - result <- s.toAPIEvent(apiOp, schema, event.Type, event.Object) - } + return fmt.Errorf("closed") + }) + + eg.Go(func() error { + for event := range watcher.ResultChan() { + if event.Type == watch.Error { + continue + result <- s.toAPIEvent(apiOp, schema, event.Type, event.Object) + } + } + return fmt.Errorf("closed") + }) + + _ = eg.Wait() + return } func (s *Store) WatchNames(apiOp *types.APIRequest, schema *types.APISchema, w types.WatchRequest, names sets.String) (chan types.APIEvent, error) { diff --git a/pkg/summarycache/summarycache.go b/pkg/summarycache/summarycache.go new file mode 100644 index 0000000..00d4edf --- /dev/null +++ b/pkg/summarycache/summarycache.go @@ -0,0 +1,392 @@ +package summarycache + +import ( + "context" + "fmt" + "strings" + "sync" + + "github.com/rancher/apiserver/pkg/types" + "github.com/rancher/steve/pkg/attributes" + "github.com/rancher/steve/pkg/schema" + "github.com/rancher/steve/pkg/schema/converter" + "github.com/rancher/wrangler/pkg/slice" + "github.com/rancher/wrangler/pkg/summary" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + runtimeschema "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/tools/cache" +) + +const ( + relationshipIndex = "relationshipIndex" +) + +var ( + cbID = 0 +) + +type Relationship struct { + ToID string `json:"toId,omitempty"` + ToType string `json:"toType,omitempty"` + ToNamespace string `json:"toNamespace,omitempty"` + FromID string `json:"fromId,omitempty"` + FromType string `json:"fromType,omitempty"` + Rel string `json:"rel,omitempty"` + Selector string `json:"selector,omitempty"` +} + +type SummaryCache struct { + sync.RWMutex + cache cache.ThreadSafeStore + schemas *schema.Collection + cbs map[int]chan *summary.Relationship +} + +func New(schemas *schema.Collection) *SummaryCache { + indexers := cache.Indexers{} + s := &SummaryCache{ + cache: cache.NewThreadSafeStore(indexers, cache.Indices{}), + schemas: schemas, + cbs: map[int]chan *summary.Relationship{}, + } + indexers[relationshipIndex] = s.relationshipIndexer + return s +} + +func (s *SummaryCache) OnInboundRelationshipChange(ctx context.Context, schema *types.APISchema, namespace string) <-chan *summary.Relationship { + s.Lock() + defer s.Unlock() + + apiVersion, kind := attributes.GVK(schema).ToAPIVersionAndKind() + ret := make(chan *summary.Relationship, 100) + cb := make(chan *summary.Relationship, 100) + id := cbID + cbID++ + s.cbs[id] = cb + + go func() { + defer close(ret) + for rel := range cb { + if rel.Kind == kind && + rel.APIVersion == apiVersion && + rel.Namespace == namespace { + ret <- rel + } + } + }() + + go func() { + <-ctx.Done() + s.Lock() + defer s.Unlock() + delete(s.cbs, id) + }() + + return cb +} + +func (s *SummaryCache) SummaryAndRelationship(obj runtime.Object) (*summary.SummarizedObject, []Relationship) { + s.RLock() + defer s.RUnlock() + + key := toKey(obj) + summaryObj, ok := s.cache.Get(key) + if !ok { + return summary.Summarized(obj), nil + } + + summarized := summaryObj.(*summary.SummarizedObject) + + relObjs, err := s.cache.ByIndex(relationshipIndex, key) + if err != nil { + return summarized, nil + } + + var ( + rels []Relationship + selectors = map[string]bool{} + ) + + for _, rel := range summarized.Relationships { + if rel.Selector != nil { + selectors[rel.APIVersion+"/"+rel.Kind] = true + } + rels = append(rels, s.toRel(summarized.Namespace, &rel)) + } + + for _, relObj := range relObjs { + summary := relObj.(*summary.SummarizedObject) + for _, rel := range summary.Relationships { + if !s.refersTo(summarized, &rel) { + continue + } + // drop references that an existing selector reference will cover + if rel.Inbound && len(selectors) > 0 && selectors[rel.APIVersion+"/"+rel.Kind] { + continue + } + rels = append(rels, s.reverseRel(summary, rel)) + } + } + + return summarized, rels +} + +func (s *SummaryCache) reverseRel(summarized *summary.SummarizedObject, rel summary.Relationship) Relationship { + return s.toRel(summarized.Namespace, &summary.Relationship{ + Name: summarized.Name, + Namespace: summarized.Namespace, + Kind: summarized.Kind, + APIVersion: summarized.APIVersion, + Inbound: !rel.Inbound, + Type: rel.Type, + }) +} + +func toSelector(sel *metav1.LabelSelector) string { + if sel == nil { + return "" + } + result, err := metav1.LabelSelectorAsSelector(sel) + if err != nil { + return "" + } + return result.String() +} + +func (s *SummaryCache) toRel(ns string, rel *summary.Relationship) Relationship { + ns = s.resolveNamespace(ns, rel.Namespace, runtimeschema.FromAPIVersionAndKind(rel.APIVersion, rel.Kind)) + + id := rel.Name + if id != "" && ns != "" { + id = ns + "/" + rel.Name + } + + if rel.Inbound { + return Relationship{ + FromID: id, + FromType: converter.GVKToSchemaID(runtimeschema.FromAPIVersionAndKind(rel.APIVersion, rel.Kind)), + Rel: rel.Type, + } + } + + toNS := "" + if rel.Selector != nil { + toNS = ns + } + + return Relationship{ + ToID: id, + ToType: converter.GVKToSchemaID(runtimeschema.FromAPIVersionAndKind(rel.APIVersion, rel.Kind)), + Rel: rel.Type, + ToNamespace: toNS, + Selector: toSelector(rel.Selector), + } +} + +func (s *SummaryCache) Add(obj runtime.Object) { + summary, rels := s.process(obj) + key := toKey(summary) + + s.cache.Add(key, summary) + for _, rel := range rels { + s.notify(rel) + } +} + +func (s *SummaryCache) notify(rel *summary.Relationship) { + go func() { + s.Lock() + defer s.Unlock() + for _, cb := range s.cbs { + cb <- rel + } + }() +} + +func (s *SummaryCache) Remove(obj runtime.Object) { + summary, rels := s.process(obj) + key := toKey(summary) + + s.cache.Delete(key) + for _, rel := range rels { + s.notify(rel) + } +} + +func (s *SummaryCache) Change(newObj, oldObj runtime.Object) { + _, oldRels := s.process(oldObj) + summary, rels := s.process(newObj) + key := toKey(summary) + + if len(rels) == len(oldRels) { + for i, rel := range rels { + if !relEquals(oldRels[i], rel) { + s.notify(rel) + } + } + } + s.cache.Update(key, summary) +} + +func (s *SummaryCache) process(obj runtime.Object) (*summary.SummarizedObject, []*summary.Relationship) { + var ( + rels []*summary.Relationship + summary = summary.Summarized(obj) + ) + + for _, rel := range summary.Relationships { + gvk := runtimeschema.FromAPIVersionAndKind(rel.APIVersion, rel.Kind) + schemaID := converter.GVKToSchemaID(gvk) + schema := s.schemas.Schema(schemaID) + if schema == nil { + continue + } + copy := rel + if copy.Namespace == "" && attributes.Namespaced(schema) { + copy.Namespace = summary.Namespace + } + rels = append(rels, ©) + } + + return summary, rels +} + +func (s *SummaryCache) relationshipIndexer(obj interface{}) (result []string, err error) { + var ( + summary = obj.(*summary.SummarizedObject) + ) + + for _, rel := range summary.Relationships { + gvk := runtimeschema.FromAPIVersionAndKind(rel.APIVersion, rel.Kind) + result = append(result, toKeyFrom(s.resolveNamespace(summary.Namespace, rel.Namespace, gvk), rel.Name, gvk)) + } + + return +} + +func (s *SummaryCache) resolveNamespace(sourceNamespace, toNamespace string, gvk runtimeschema.GroupVersionKind) string { + if toNamespace != "" { + return toNamespace + } + schema := s.schemas.Schema(converter.GVKToSchemaID(gvk)) + if schema == nil || !attributes.Namespaced(schema) { + return toNamespace + } + return sourceNamespace +} + +func (s *SummaryCache) refersTo(summarized *summary.SummarizedObject, rel *summary.Relationship) bool { + if summarized.APIVersion != rel.APIVersion || + summarized.Kind != rel.Kind || + summarized.Name != rel.Name { + return false + } + if summarized.Namespace == "" && rel.Namespace == "" { + return true + } + ns := s.resolveNamespace(summarized.Namespace, rel.Namespace, summarized.GroupVersionKind()) + return summarized.Namespace == ns +} + +func (s *SummaryCache) OnAdd(gvr runtimeschema.GroupVersionResource, key string, obj runtime.Object) error { + s.Add(obj) + return nil +} + +func (s *SummaryCache) OnRemove(gvr runtimeschema.GroupVersionResource, key string, obj runtime.Object) error { + s.Remove(obj) + return nil +} + +func (s *SummaryCache) OnChange(gvr runtimeschema.GroupVersionResource, key string, obj, oldObj runtime.Object) error { + s.Change(obj, oldObj) + return nil +} + +func toKeyFrom(namespace, name string, gvk runtimeschema.GroupVersionKind, other ...string) string { + parts := []string{ + gvk.Group, + gvk.Version, + gvk.Kind, + namespace, + name, + } + parts = append(parts, other...) + return strings.Join(parts, ",") +} + +func toKey(obj runtime.Object) string { + var ( + name, namespace = "", "" + gvk = obj.GetObjectKind().GroupVersionKind() + ) + + m, err := meta.Accessor(obj) + if err == nil { + name = m.GetName() + namespace = m.GetNamespace() + } + + return toKeyFrom(namespace, name, gvk) +} + +func toRelKey(key string, index int) string { + return fmt.Sprintf("%s:%d", key, index) +} + +func relEquals(left, right *summary.Relationship) bool { + if left == nil && right == nil { + return true + } else if left == nil || right == nil { + return false + } + + return left.Name == right.Name && + left.Namespace == right.Namespace && + left.ControlledBy == right.ControlledBy && + left.Kind == right.Kind && + left.APIVersion == right.APIVersion && + left.Inbound == right.Inbound && + left.Type == right.Type && + selEquals(left.Selector, right.Selector) +} + +func selEquals(left, right *metav1.LabelSelector) bool { + if left == nil && right == nil { + return true + } else if left == nil || right == nil { + return false + } + + return reqEquals(left.MatchExpressions, right.MatchExpressions) && + mapEquals(left.MatchLabels, right.MatchLabels) +} + +func reqEquals(left, right []metav1.LabelSelectorRequirement) bool { + if len(left) != len(right) { + return false + } + for i, right := range right { + left := left[i] + if left.Key != right.Key || + left.Operator != right.Operator || + !slice.StringsEqual(left.Values, right.Values) { + return false + } + } + return true +} + +func mapEquals(left, right map[string]string) bool { + if len(left) != len(right) { + return false + } + for k, v := range right { + if left[k] != v { + return false + } + } + return true +}