Show resource relationships

This commit is contained in:
Darren Shepherd 2020-06-22 08:49:49 -07:00
parent 02b0602945
commit d796ed60a7
6 changed files with 478 additions and 46 deletions

View File

@ -7,48 +7,49 @@ import (
"github.com/rancher/steve/pkg/accesscontrol"
"github.com/rancher/steve/pkg/schema"
"github.com/rancher/steve/pkg/stores/proxy"
"github.com/rancher/steve/pkg/summarycache"
"github.com/rancher/wrangler/pkg/data"
"github.com/rancher/wrangler/pkg/summary"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
)
func DefaultTemplate(clientGetter proxy.ClientGetter, asl accesscontrol.AccessSetLookup) schema.Template {
func DefaultTemplate(clientGetter proxy.ClientGetter,
summaryCache *summarycache.SummaryCache,
asl accesscontrol.AccessSetLookup) schema.Template {
return schema.Template{
Store: proxy.NewProxyStore(clientGetter, asl),
Formatter: Formatter,
Store: proxy.NewProxyStore(clientGetter, summaryCache, asl),
Formatter: formatter(summaryCache),
}
}
func DefaultFormatter(next types.Formatter) types.Formatter {
return types.FormatterChain(Formatter, next)
}
func formatter(summarycache *summarycache.SummaryCache) types.Formatter {
return func(request *types.APIRequest, resource *types.RawResource) {
meta, err := meta.Accessor(resource.APIObject.Object)
if err != nil {
return
}
func Formatter(request *types.APIRequest, resource *types.RawResource) {
meta, err := meta.Accessor(resource.APIObject.Object)
if err != nil {
return
}
selfLink := meta.GetSelfLink()
if selfLink == "" {
return
}
selfLink := meta.GetSelfLink()
if selfLink == "" {
return
}
u := request.URLBuilder.RelativeToRoot(selfLink)
resource.Links["view"] = u
u := request.URLBuilder.RelativeToRoot(selfLink)
resource.Links["view"] = u
if _, ok := resource.Links["update"]; !ok {
resource.Links["update"] = u
}
if _, ok := resource.Links["update"]; !ok {
resource.Links["update"] = u
}
if unstr, ok := resource.APIObject.Object.(*unstructured.Unstructured); ok {
summary := summary.Summarize(unstr)
data.PutValue(unstr.Object, map[string]interface{}{
"name": summary.State,
"error": summary.Error,
"transitioning": summary.Transitioning,
"message": strings.Join(summary.Message, ":"),
}, "metadata", "state")
if unstr, ok := resource.APIObject.Object.(*unstructured.Unstructured); ok {
summary, rel := summarycache.SummaryAndRelationship(unstr)
data.PutValue(unstr.Object, map[string]interface{}{
"name": summary.State,
"error": summary.Error,
"transitioning": summary.Transitioning,
"message": strings.Join(summary.Message, ":"),
}, "metadata", "state")
data.PutValue(unstr.Object, rel, "metadata", "relationships")
}
}
}

View File

@ -3,6 +3,8 @@ package resources
import (
"context"
"github.com/rancher/steve/pkg/summarycache"
"github.com/rancher/apiserver/pkg/store/apiroot"
"github.com/rancher/apiserver/pkg/subscribe"
"github.com/rancher/apiserver/pkg/types"
@ -31,17 +33,20 @@ func DefaultSchemas(ctx context.Context, baseSchema *types.APISchemas, ccache cl
return baseSchema, err
}
func DefaultSchemaTemplates(cf *client.Factory, lookup accesscontrol.AccessSetLookup, discovery discovery.DiscoveryInterface) []schema.Template {
func DefaultSchemaTemplates(cf *client.Factory,
summaryCache *summarycache.SummaryCache,
lookup accesscontrol.AccessSetLookup,
discovery discovery.DiscoveryInterface) []schema.Template {
return []schema.Template{
common.DefaultTemplate(cf, lookup),
common.DefaultTemplate(cf, summaryCache, lookup),
apigroups.Template(discovery),
{
ID: "configmap",
Formatter: common.DefaultFormatter(helm.DropHelmData),
Formatter: helm.DropHelmData,
},
{
ID: "secret",
Formatter: common.DefaultFormatter(helm.DropHelmData),
Formatter: helm.DropHelmData,
},
}
}

View File

@ -138,6 +138,8 @@ func (c *Collection) applyTemplates(schema *types.APISchema) {
}
if schema.Formatter == nil {
schema.Formatter = t.Formatter
} else if t.Formatter != nil {
schema.Formatter = types.FormatterChain(t.Formatter, schema.Formatter)
}
if schema.Store == nil {
if t.StoreFactory == nil {

View File

@ -17,6 +17,7 @@ import (
"github.com/rancher/steve/pkg/resources/schemas"
"github.com/rancher/steve/pkg/schema"
"github.com/rancher/steve/pkg/server/handler"
"github.com/rancher/steve/pkg/summarycache"
)
var ErrConfigRequired = errors.New("rest config is required")
@ -72,15 +73,19 @@ func setup(ctx context.Context, server *Server) (http.Handler, *schema.Collectio
return nil, nil, err
}
server.SchemaTemplates = append(server.SchemaTemplates, resources.DefaultSchemaTemplates(cf, asl, server.K8s.Discovery())...)
sf := schema.NewCollection(ctx, server.BaseSchemas, asl)
summaryCache := summarycache.New(sf)
ccache.OnAdd(ctx, summaryCache.OnAdd)
ccache.OnRemove(ctx, summaryCache.OnRemove)
ccache.OnChange(ctx, summaryCache.OnChange)
server.SchemaTemplates = append(server.SchemaTemplates, resources.DefaultSchemaTemplates(cf, summaryCache, asl, server.K8s.Discovery())...)
cols, err := common.NewDynamicColumns(server.RestConfig)
if err != nil {
return nil, nil, err
}
sf := schema.NewCollection(ctx, server.BaseSchemas, asl)
schemas.SetupWatcher(ctx, server.BaseSchemas, asl, sf)
sync := schemacontroller.Register(ctx,

View File

@ -1,6 +1,7 @@
package proxy
import (
"context"
"encoding/json"
"fmt"
"io"
@ -16,7 +17,9 @@ import (
"github.com/rancher/steve/pkg/stores/partition"
"github.com/rancher/wrangler/pkg/data"
"github.com/rancher/wrangler/pkg/schemas/validation"
"github.com/rancher/wrangler/pkg/summary"
"github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
@ -44,17 +47,23 @@ type ClientGetter interface {
TableAdminClientForWatch(ctx *types.APIRequest, schema *types.APISchema, namespace string) (dynamic.ResourceInterface, error)
}
type Store struct {
clientGetter ClientGetter
type RelationshipNotifier interface {
OnInboundRelationshipChange(ctx context.Context, schema *types.APISchema, namespace string) <-chan *summary.Relationship
}
func NewProxyStore(clientGetter ClientGetter, lookup accesscontrol.AccessSetLookup) types.Store {
type Store struct {
clientGetter ClientGetter
notifier RelationshipNotifier
}
func NewProxyStore(clientGetter ClientGetter, notifier RelationshipNotifier, lookup accesscontrol.AccessSetLookup) types.Store {
return &errorStore{
Store: &WatchRefresh{
Store: &partition.Store{
Partitioner: &rbacPartitioner{
proxyStore: &Store{
clientGetter: clientGetter,
notifier: notifier,
},
},
},
@ -286,17 +295,35 @@ func (s *Store) listAndWatch(apiOp *types.APIRequest, k8sClient dynamic.Resource
defer watcher.Stop()
logrus.Debugf("opening watcher for %s", schema.ID)
eg, ctx := errgroup.WithContext(apiOp.Context())
go func() {
<-apiOp.Request.Context().Done()
<-ctx.Done()
watcher.Stop()
}()
for event := range watcher.ResultChan() {
if event.Type == watch.Error {
continue
eg.Go(func() error {
for rel := range s.notifier.OnInboundRelationshipChange(ctx, schema, apiOp.Namespace) {
obj, err := s.byID(apiOp, schema, rel.Name)
if err == nil {
result <- s.toAPIEvent(apiOp, schema, watch.Modified, obj)
}
}
result <- s.toAPIEvent(apiOp, schema, event.Type, event.Object)
}
return fmt.Errorf("closed")
})
eg.Go(func() error {
for event := range watcher.ResultChan() {
if event.Type == watch.Error {
continue
result <- s.toAPIEvent(apiOp, schema, event.Type, event.Object)
}
}
return fmt.Errorf("closed")
})
_ = eg.Wait()
return
}
func (s *Store) WatchNames(apiOp *types.APIRequest, schema *types.APISchema, w types.WatchRequest, names sets.String) (chan types.APIEvent, error) {

View File

@ -0,0 +1,392 @@
package summarycache
import (
"context"
"fmt"
"strings"
"sync"
"github.com/rancher/apiserver/pkg/types"
"github.com/rancher/steve/pkg/attributes"
"github.com/rancher/steve/pkg/schema"
"github.com/rancher/steve/pkg/schema/converter"
"github.com/rancher/wrangler/pkg/slice"
"github.com/rancher/wrangler/pkg/summary"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
runtimeschema "k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/tools/cache"
)
const (
relationshipIndex = "relationshipIndex"
)
var (
cbID = 0
)
type Relationship struct {
ToID string `json:"toId,omitempty"`
ToType string `json:"toType,omitempty"`
ToNamespace string `json:"toNamespace,omitempty"`
FromID string `json:"fromId,omitempty"`
FromType string `json:"fromType,omitempty"`
Rel string `json:"rel,omitempty"`
Selector string `json:"selector,omitempty"`
}
type SummaryCache struct {
sync.RWMutex
cache cache.ThreadSafeStore
schemas *schema.Collection
cbs map[int]chan *summary.Relationship
}
func New(schemas *schema.Collection) *SummaryCache {
indexers := cache.Indexers{}
s := &SummaryCache{
cache: cache.NewThreadSafeStore(indexers, cache.Indices{}),
schemas: schemas,
cbs: map[int]chan *summary.Relationship{},
}
indexers[relationshipIndex] = s.relationshipIndexer
return s
}
func (s *SummaryCache) OnInboundRelationshipChange(ctx context.Context, schema *types.APISchema, namespace string) <-chan *summary.Relationship {
s.Lock()
defer s.Unlock()
apiVersion, kind := attributes.GVK(schema).ToAPIVersionAndKind()
ret := make(chan *summary.Relationship, 100)
cb := make(chan *summary.Relationship, 100)
id := cbID
cbID++
s.cbs[id] = cb
go func() {
defer close(ret)
for rel := range cb {
if rel.Kind == kind &&
rel.APIVersion == apiVersion &&
rel.Namespace == namespace {
ret <- rel
}
}
}()
go func() {
<-ctx.Done()
s.Lock()
defer s.Unlock()
delete(s.cbs, id)
}()
return cb
}
func (s *SummaryCache) SummaryAndRelationship(obj runtime.Object) (*summary.SummarizedObject, []Relationship) {
s.RLock()
defer s.RUnlock()
key := toKey(obj)
summaryObj, ok := s.cache.Get(key)
if !ok {
return summary.Summarized(obj), nil
}
summarized := summaryObj.(*summary.SummarizedObject)
relObjs, err := s.cache.ByIndex(relationshipIndex, key)
if err != nil {
return summarized, nil
}
var (
rels []Relationship
selectors = map[string]bool{}
)
for _, rel := range summarized.Relationships {
if rel.Selector != nil {
selectors[rel.APIVersion+"/"+rel.Kind] = true
}
rels = append(rels, s.toRel(summarized.Namespace, &rel))
}
for _, relObj := range relObjs {
summary := relObj.(*summary.SummarizedObject)
for _, rel := range summary.Relationships {
if !s.refersTo(summarized, &rel) {
continue
}
// drop references that an existing selector reference will cover
if rel.Inbound && len(selectors) > 0 && selectors[rel.APIVersion+"/"+rel.Kind] {
continue
}
rels = append(rels, s.reverseRel(summary, rel))
}
}
return summarized, rels
}
func (s *SummaryCache) reverseRel(summarized *summary.SummarizedObject, rel summary.Relationship) Relationship {
return s.toRel(summarized.Namespace, &summary.Relationship{
Name: summarized.Name,
Namespace: summarized.Namespace,
Kind: summarized.Kind,
APIVersion: summarized.APIVersion,
Inbound: !rel.Inbound,
Type: rel.Type,
})
}
func toSelector(sel *metav1.LabelSelector) string {
if sel == nil {
return ""
}
result, err := metav1.LabelSelectorAsSelector(sel)
if err != nil {
return ""
}
return result.String()
}
func (s *SummaryCache) toRel(ns string, rel *summary.Relationship) Relationship {
ns = s.resolveNamespace(ns, rel.Namespace, runtimeschema.FromAPIVersionAndKind(rel.APIVersion, rel.Kind))
id := rel.Name
if id != "" && ns != "" {
id = ns + "/" + rel.Name
}
if rel.Inbound {
return Relationship{
FromID: id,
FromType: converter.GVKToSchemaID(runtimeschema.FromAPIVersionAndKind(rel.APIVersion, rel.Kind)),
Rel: rel.Type,
}
}
toNS := ""
if rel.Selector != nil {
toNS = ns
}
return Relationship{
ToID: id,
ToType: converter.GVKToSchemaID(runtimeschema.FromAPIVersionAndKind(rel.APIVersion, rel.Kind)),
Rel: rel.Type,
ToNamespace: toNS,
Selector: toSelector(rel.Selector),
}
}
func (s *SummaryCache) Add(obj runtime.Object) {
summary, rels := s.process(obj)
key := toKey(summary)
s.cache.Add(key, summary)
for _, rel := range rels {
s.notify(rel)
}
}
func (s *SummaryCache) notify(rel *summary.Relationship) {
go func() {
s.Lock()
defer s.Unlock()
for _, cb := range s.cbs {
cb <- rel
}
}()
}
func (s *SummaryCache) Remove(obj runtime.Object) {
summary, rels := s.process(obj)
key := toKey(summary)
s.cache.Delete(key)
for _, rel := range rels {
s.notify(rel)
}
}
func (s *SummaryCache) Change(newObj, oldObj runtime.Object) {
_, oldRels := s.process(oldObj)
summary, rels := s.process(newObj)
key := toKey(summary)
if len(rels) == len(oldRels) {
for i, rel := range rels {
if !relEquals(oldRels[i], rel) {
s.notify(rel)
}
}
}
s.cache.Update(key, summary)
}
func (s *SummaryCache) process(obj runtime.Object) (*summary.SummarizedObject, []*summary.Relationship) {
var (
rels []*summary.Relationship
summary = summary.Summarized(obj)
)
for _, rel := range summary.Relationships {
gvk := runtimeschema.FromAPIVersionAndKind(rel.APIVersion, rel.Kind)
schemaID := converter.GVKToSchemaID(gvk)
schema := s.schemas.Schema(schemaID)
if schema == nil {
continue
}
copy := rel
if copy.Namespace == "" && attributes.Namespaced(schema) {
copy.Namespace = summary.Namespace
}
rels = append(rels, &copy)
}
return summary, rels
}
func (s *SummaryCache) relationshipIndexer(obj interface{}) (result []string, err error) {
var (
summary = obj.(*summary.SummarizedObject)
)
for _, rel := range summary.Relationships {
gvk := runtimeschema.FromAPIVersionAndKind(rel.APIVersion, rel.Kind)
result = append(result, toKeyFrom(s.resolveNamespace(summary.Namespace, rel.Namespace, gvk), rel.Name, gvk))
}
return
}
func (s *SummaryCache) resolveNamespace(sourceNamespace, toNamespace string, gvk runtimeschema.GroupVersionKind) string {
if toNamespace != "" {
return toNamespace
}
schema := s.schemas.Schema(converter.GVKToSchemaID(gvk))
if schema == nil || !attributes.Namespaced(schema) {
return toNamespace
}
return sourceNamespace
}
func (s *SummaryCache) refersTo(summarized *summary.SummarizedObject, rel *summary.Relationship) bool {
if summarized.APIVersion != rel.APIVersion ||
summarized.Kind != rel.Kind ||
summarized.Name != rel.Name {
return false
}
if summarized.Namespace == "" && rel.Namespace == "" {
return true
}
ns := s.resolveNamespace(summarized.Namespace, rel.Namespace, summarized.GroupVersionKind())
return summarized.Namespace == ns
}
func (s *SummaryCache) OnAdd(gvr runtimeschema.GroupVersionResource, key string, obj runtime.Object) error {
s.Add(obj)
return nil
}
func (s *SummaryCache) OnRemove(gvr runtimeschema.GroupVersionResource, key string, obj runtime.Object) error {
s.Remove(obj)
return nil
}
func (s *SummaryCache) OnChange(gvr runtimeschema.GroupVersionResource, key string, obj, oldObj runtime.Object) error {
s.Change(obj, oldObj)
return nil
}
func toKeyFrom(namespace, name string, gvk runtimeschema.GroupVersionKind, other ...string) string {
parts := []string{
gvk.Group,
gvk.Version,
gvk.Kind,
namespace,
name,
}
parts = append(parts, other...)
return strings.Join(parts, ",")
}
func toKey(obj runtime.Object) string {
var (
name, namespace = "", ""
gvk = obj.GetObjectKind().GroupVersionKind()
)
m, err := meta.Accessor(obj)
if err == nil {
name = m.GetName()
namespace = m.GetNamespace()
}
return toKeyFrom(namespace, name, gvk)
}
func toRelKey(key string, index int) string {
return fmt.Sprintf("%s:%d", key, index)
}
func relEquals(left, right *summary.Relationship) bool {
if left == nil && right == nil {
return true
} else if left == nil || right == nil {
return false
}
return left.Name == right.Name &&
left.Namespace == right.Namespace &&
left.ControlledBy == right.ControlledBy &&
left.Kind == right.Kind &&
left.APIVersion == right.APIVersion &&
left.Inbound == right.Inbound &&
left.Type == right.Type &&
selEquals(left.Selector, right.Selector)
}
func selEquals(left, right *metav1.LabelSelector) bool {
if left == nil && right == nil {
return true
} else if left == nil || right == nil {
return false
}
return reqEquals(left.MatchExpressions, right.MatchExpressions) &&
mapEquals(left.MatchLabels, right.MatchLabels)
}
func reqEquals(left, right []metav1.LabelSelectorRequirement) bool {
if len(left) != len(right) {
return false
}
for i, right := range right {
left := left[i]
if left.Key != right.Key ||
left.Operator != right.Operator ||
!slice.StringsEqual(left.Values, right.Values) {
return false
}
}
return true
}
func mapEquals(left, right map[string]string) bool {
if len(left) != len(right) {
return false
}
for k, v := range right {
if left[k] != v {
return false
}
}
return true
}