diff --git a/pkg/accesscontrol/access_set.go b/pkg/accesscontrol/access_set.go index 0704a16..72b745b 100644 --- a/pkg/accesscontrol/access_set.go +++ b/pkg/accesscontrol/access_set.go @@ -146,6 +146,9 @@ func (a Access) nameOK(name string) bool { } func GetAccessListMap(s *types.Schema) AccessListMap { + if s == nil { + return nil + } v, _ := attributes.Access(s).(AccessListMap) return v } diff --git a/pkg/counts/types.go b/pkg/counts/types.go index 563badd..c64c0ea 100644 --- a/pkg/counts/types.go +++ b/pkg/counts/types.go @@ -6,8 +6,10 @@ import ( "sync" "time" + "github.com/rancher/naok/pkg/accesscontrol" "github.com/rancher/norman/pkg/store/empty" "github.com/rancher/norman/pkg/types" + "github.com/sirupsen/logrus" "golang.org/x/sync/errgroup" ) @@ -16,17 +18,31 @@ var ( "count": true, "schema": true, } + slow = map[string]bool{ + "io.k8s.api.management.cattle.io.v3.CatalogTemplateVersion": true, + "io.k8s.api.management.cattle.io.v3.CatalogTemplate": true, + } + listTimeout = 1750 * time.Millisecond ) func Register(schemas *types.Schemas) { schemas.MustImportAndCustomize(Count{}, func(schema *types.Schema) { schema.CollectionMethods = []string{http.MethodGet} schema.ResourceMethods = []string{} + schema.Attributes["access"] = accesscontrol.AccessListMap{ + "watch": accesscontrol.AccessList{ + { + Namespace: "*", + ResourceName: "*", + }, + }, + } schema.Store = &Store{} }) } type Count struct { + ID string `json:"id,omitempty"` Counts map[string]ItemCount `json:"counts,omitempty"` } @@ -40,20 +56,103 @@ type Store struct { } func (s *Store) ByID(apiOp *types.APIRequest, schema *types.Schema, id string) (types.APIObject, error) { - c, err := s.getCount(apiOp, 750*time.Millisecond) + c, err := s.getCount(apiOp, listTimeout, true) return types.ToAPI(c), err } func (s *Store) List(apiOp *types.APIRequest, schema *types.Schema, opt *types.QueryOptions) (types.APIObject, error) { - c, err := s.getCount(apiOp, 750*time.Millisecond) + c, err := s.getCount(apiOp, listTimeout, true) return types.ToAPI([]interface{}{c}), err } func (s *Store) Watch(apiOp *types.APIRequest, schema *types.Schema, w types.WatchRequest) (chan types.APIEvent, error) { - return nil, nil + c, err := s.getCount(apiOp, listTimeout*10, false) + if err != nil { + return nil, err + } + + wg := sync.WaitGroup{} + ctx, cancel := context.WithCancel(apiOp.Context()) + + child := make(chan Count) + for name, countItem := range c.Counts { + wg.Add(1) + go func() { + s.watchItem(apiOp.WithContext(ctx), name, countItem.Count, countItem.Revision, cancel, child) + wg.Done() + }() + } + + go func() { + wg.Wait() + close(child) + }() + + result := make(chan types.APIEvent) + go func() { + defer close(result) + + result <- types.APIEvent{ + Name: "resource.create", + ResourceType: "count", + Object: types.ToAPI(c), + } + + for change := range child { + for k, v := range change.Counts { + c.Counts[k] = v + } + + result <- types.APIEvent{ + Name: "resource.change", + ResourceType: "count", + Object: types.ToAPI(c), + } + } + }() + + return result, nil } -func (s *Store) getCount(apiOp *types.APIRequest, timeout time.Duration) (Count, error) { +func (s *Store) watchItem(apiOp *types.APIRequest, schemaID string, start int, revision string, cancel func(), counts chan Count) { + schema := apiOp.Schemas.Schema(schemaID) + if schema == nil || schema.Store == nil || apiOp.AccessControl.CanWatch(apiOp, schema) != nil { + return + } + + defer cancel() + + w, err := schema.Store.Watch(apiOp, schema, types.WatchRequest{Revision: revision}) + if err != nil { + logrus.Errorf("failed to watch %s for counts: %v", schema.ID, err) + return + } + + for event := range w { + if event.Revision == revision { + continue + } + + write := false + if event.Name == "resource.create" { + start++ + write = true + } else if event.Name == "resource.remove" { + start-- + write = true + } + if write { + counts <- Count{Counts: map[string]ItemCount{ + schemaID: { + Count: start, + Revision: event.Revision, + }, + }} + } + } +} + +func (s *Store) getCount(apiOp *types.APIRequest, timeout time.Duration, ignoreSlow bool) (Count, error) { var countLock sync.Mutex counts := map[string]ItemCount{} @@ -66,6 +165,10 @@ func (s *Store) getCount(apiOp *types.APIRequest, timeout time.Duration) (Count, continue } + if ignoreSlow && slow[schema.ID] { + continue + } + if schema.Store == nil { continue } @@ -95,7 +198,17 @@ func (s *Store) getCount(apiOp *types.APIRequest, timeout time.Duration) (Count, }) } - if err := eg.Wait(); err != nil && err != context.Canceled { + var ( + err error + ) + + select { + case err = <-future(eg.Wait): + case <-errCtx.Done(): + err = errCtx.Err() + } + + if err != nil && err != context.Canceled && err != context.DeadlineExceeded { return Count{}, err } @@ -103,6 +216,7 @@ func (s *Store) getCount(apiOp *types.APIRequest, timeout time.Duration) (Count, // to avoid returning a map that might get modified countLock.Lock() result := Count{ + ID: "count", Counts: map[string]ItemCount{}, } for k, v := range counts { @@ -112,3 +226,14 @@ func (s *Store) getCount(apiOp *types.APIRequest, timeout time.Duration) (Count, return result, nil } + +func future(f func() error) chan error { + result := make(chan error, 1) + go func() { + defer close(result) + if err := f(); err != nil { + result <- err + } + }() + return result +}