Add state info to relationships

This commit is contained in:
Darren Shepherd 2020-10-23 14:00:11 -07:00
parent a44863b331
commit 92638937df
2 changed files with 55 additions and 23 deletions

View File

@ -127,10 +127,8 @@ func setup(ctx context.Context, server *Server) error {
} }
sf := schema.NewCollection(ctx, server.BaseSchemas, asl) sf := schema.NewCollection(ctx, server.BaseSchemas, asl)
summaryCache := summarycache.New(sf) summaryCache := summarycache.New(sf, ccache)
ccache.OnAdd(ctx, summaryCache.OnAdd) summaryCache.Start(ctx)
ccache.OnRemove(ctx, summaryCache.OnRemove)
ccache.OnChange(ctx, summaryCache.OnChange)
for _, template := range resources.DefaultSchemaTemplates(cf, summaryCache, asl, server.controllers.K8s.Discovery()) { for _, template := range resources.DefaultSchemaTemplates(cf, summaryCache, asl, server.controllers.K8s.Discovery()) {
sf.AddTemplate(template) sf.AddTemplate(template)

View File

@ -2,12 +2,12 @@ package summarycache
import ( import (
"context" "context"
"fmt"
"strings" "strings"
"sync" "sync"
"github.com/rancher/apiserver/pkg/types" "github.com/rancher/apiserver/pkg/types"
"github.com/rancher/steve/pkg/attributes" "github.com/rancher/steve/pkg/attributes"
"github.com/rancher/steve/pkg/clustercache"
"github.com/rancher/steve/pkg/schema" "github.com/rancher/steve/pkg/schema"
"github.com/rancher/steve/pkg/schema/converter" "github.com/rancher/steve/pkg/schema/converter"
"github.com/rancher/wrangler/pkg/slice" "github.com/rancher/wrangler/pkg/slice"
@ -35,26 +35,39 @@ type Relationship struct {
FromType string `json:"fromType,omitempty"` FromType string `json:"fromType,omitempty"`
Rel string `json:"rel,omitempty"` Rel string `json:"rel,omitempty"`
Selector string `json:"selector,omitempty"` Selector string `json:"selector,omitempty"`
State string `json:"state,omitempty"`
Message string `json:"message,omitempty"`
Error bool `json:"error,omitempty"`
Transitioning bool `json:"transitioning,omitempty"`
} }
type SummaryCache struct { type SummaryCache struct {
sync.RWMutex sync.RWMutex
cache cache.ThreadSafeStore cache cache.ThreadSafeStore
schemas *schema.Collection schemas *schema.Collection
cbs map[int]chan *summary.Relationship clusterCache clustercache.ClusterCache
cbs map[int]chan *summary.Relationship
} }
func New(schemas *schema.Collection) *SummaryCache { func New(schemas *schema.Collection, clusterCache clustercache.ClusterCache) *SummaryCache {
indexers := cache.Indexers{} indexers := cache.Indexers{}
s := &SummaryCache{ s := &SummaryCache{
cache: cache.NewThreadSafeStore(indexers, cache.Indices{}), cache: cache.NewThreadSafeStore(indexers, cache.Indices{}),
schemas: schemas, schemas: schemas,
cbs: map[int]chan *summary.Relationship{}, clusterCache: clusterCache,
cbs: map[int]chan *summary.Relationship{},
} }
indexers[relationshipIndex] = s.relationshipIndexer indexers[relationshipIndex] = s.relationshipIndexer
return s return s
} }
func (s *SummaryCache) Start(ctx context.Context) {
s.clusterCache.OnAdd(ctx, s.OnAdd)
s.clusterCache.OnRemove(ctx, s.OnRemove)
s.clusterCache.OnChange(ctx, s.OnChange)
}
func (s *SummaryCache) OnInboundRelationshipChange(ctx context.Context, schema *types.APISchema, namespace string) <-chan *summary.Relationship { func (s *SummaryCache) OnInboundRelationshipChange(ctx context.Context, schema *types.APISchema, namespace string) <-chan *summary.Relationship {
s.Lock() s.Lock()
defer s.Unlock() defer s.Unlock()
@ -152,19 +165,25 @@ func toSelector(sel *metav1.LabelSelector) string {
} }
func (s *SummaryCache) toRel(ns string, rel *summary.Relationship) Relationship { func (s *SummaryCache) toRel(ns string, rel *summary.Relationship) Relationship {
ns = s.resolveNamespace(ns, rel.Namespace, runtimeschema.FromAPIVersionAndKind(rel.APIVersion, rel.Kind)) gvk := runtimeschema.FromAPIVersionAndKind(rel.APIVersion, rel.Kind)
ns = s.resolveNamespace(ns, rel.Namespace, gvk)
id := rel.Name id := rel.Name
if id != "" && ns != "" { if id != "" && ns != "" {
id = ns + "/" + rel.Name id = ns + "/" + rel.Name
} }
obj, ok, err := s.clusterCache.Get(gvk, ns, rel.Name)
if err != nil || !ok {
obj = nil
}
if rel.Inbound { if rel.Inbound {
return Relationship{ return addObject(Relationship{
FromID: id, FromID: id,
FromType: converter.GVKToSchemaID(runtimeschema.FromAPIVersionAndKind(rel.APIVersion, rel.Kind)), FromType: converter.GVKToSchemaID(runtimeschema.FromAPIVersionAndKind(rel.APIVersion, rel.Kind)),
Rel: rel.Type, Rel: rel.Type,
} }, obj)
} }
toNS := "" toNS := ""
@ -172,13 +191,32 @@ func (s *SummaryCache) toRel(ns string, rel *summary.Relationship) Relationship
toNS = ns toNS = ns
} }
return Relationship{ return addObject(Relationship{
ToID: id, ToID: id,
ToType: converter.GVKToSchemaID(runtimeschema.FromAPIVersionAndKind(rel.APIVersion, rel.Kind)), ToType: converter.GVKToSchemaID(runtimeschema.FromAPIVersionAndKind(rel.APIVersion, rel.Kind)),
Rel: rel.Type, Rel: rel.Type,
ToNamespace: toNS, ToNamespace: toNS,
Selector: toSelector(rel.Selector), Selector: toSelector(rel.Selector),
}, obj)
}
func addObject(rel Relationship, obj interface{}) Relationship {
if obj == nil {
return rel
} }
ro, ok := obj.(runtime.Object)
if !ok {
return rel
}
summarized := summary.Summarized(ro)
rel.State = summarized.State
rel.Error = summarized.Error
rel.Message = strings.Join(summarized.Message, "; ")
rel.Transitioning = summarized.Transitioning
return rel
} }
func (s *SummaryCache) Add(obj runtime.Object) { func (s *SummaryCache) Add(obj runtime.Object) {
@ -286,17 +324,17 @@ func (s *SummaryCache) refersTo(summarized *summary.SummarizedObject, rel *summa
return summarized.Namespace == ns return summarized.Namespace == ns
} }
func (s *SummaryCache) OnAdd(gvr runtimeschema.GroupVersionResource, key string, obj runtime.Object) error { func (s *SummaryCache) OnAdd(_ runtimeschema.GroupVersionKind, key string, obj runtime.Object) error {
s.Add(obj) s.Add(obj)
return nil return nil
} }
func (s *SummaryCache) OnRemove(gvr runtimeschema.GroupVersionResource, key string, obj runtime.Object) error { func (s *SummaryCache) OnRemove(_ runtimeschema.GroupVersionKind, key string, obj runtime.Object) error {
s.Remove(obj) s.Remove(obj)
return nil return nil
} }
func (s *SummaryCache) OnChange(gvr runtimeschema.GroupVersionResource, key string, obj, oldObj runtime.Object) error { func (s *SummaryCache) OnChange(_ runtimeschema.GroupVersionKind, key string, obj, oldObj runtime.Object) error {
s.Change(obj, oldObj) s.Change(obj, oldObj)
return nil return nil
} }
@ -328,10 +366,6 @@ func toKey(obj runtime.Object) string {
return toKeyFrom(namespace, name, gvk) return toKeyFrom(namespace, name, gvk)
} }
func toRelKey(key string, index int) string {
return fmt.Sprintf("%s:%d", key, index)
}
func relEquals(left, right *summary.Relationship) bool { func relEquals(left, right *summary.Relationship) bool {
if left == nil && right == nil { if left == nil && right == nil {
return true return true