diff --git a/pkg/counts/types.go b/pkg/counts/types.go new file mode 100644 index 0000000..563badd --- /dev/null +++ b/pkg/counts/types.go @@ -0,0 +1,114 @@ +package counts + +import ( + "context" + "net/http" + "sync" + "time" + + "github.com/rancher/norman/pkg/store/empty" + "github.com/rancher/norman/pkg/types" + "golang.org/x/sync/errgroup" +) + +var ( + ignore = map[string]bool{ + "count": true, + "schema": true, + } +) + +func Register(schemas *types.Schemas) { + schemas.MustImportAndCustomize(Count{}, func(schema *types.Schema) { + schema.CollectionMethods = []string{http.MethodGet} + schema.ResourceMethods = []string{} + schema.Store = &Store{} + }) +} + +type Count struct { + Counts map[string]ItemCount `json:"counts,omitempty"` +} + +type ItemCount struct { + Count int `json:"count,omitempty"` + Revision string `json:"revision,omitempty"` +} + +type Store struct { + empty.Store +} + +func (s *Store) ByID(apiOp *types.APIRequest, schema *types.Schema, id string) (types.APIObject, error) { + c, err := s.getCount(apiOp, 750*time.Millisecond) + return types.ToAPI(c), err +} + +func (s *Store) List(apiOp *types.APIRequest, schema *types.Schema, opt *types.QueryOptions) (types.APIObject, error) { + c, err := s.getCount(apiOp, 750*time.Millisecond) + return types.ToAPI([]interface{}{c}), err +} + +func (s *Store) Watch(apiOp *types.APIRequest, schema *types.Schema, w types.WatchRequest) (chan types.APIEvent, error) { + return nil, nil +} + +func (s *Store) getCount(apiOp *types.APIRequest, timeout time.Duration) (Count, error) { + var countLock sync.Mutex + counts := map[string]ItemCount{} + + errCtx, cancel := context.WithTimeout(apiOp.Context(), timeout) + eg, errCtx := errgroup.WithContext(errCtx) + defer cancel() + + for _, schema := range apiOp.Schemas.Schemas() { + if ignore[schema.ID] { + continue + } + + if schema.Store == nil { + continue + } + + if apiOp.AccessControl.CanList(apiOp, schema) != nil { + continue + } + + current := schema + eg.Go(func() error { + list, err := current.Store.List(apiOp, current, nil) + if err != nil { + return err + } + if list.IsNil() { + return nil + } + + countLock.Lock() + counts[current.ID] = ItemCount{ + Count: len(list.List()), + Revision: list.ListRevision, + } + countLock.Unlock() + + return nil + }) + } + + if err := eg.Wait(); err != nil && err != context.Canceled { + return Count{}, err + } + + // in the case of cancellation go routines could still be running so we copy the map + // to avoid returning a map that might get modified + countLock.Lock() + result := Count{ + Counts: map[string]ItemCount{}, + } + for k, v := range counts { + result.Counts[k] = v + } + countLock.Unlock() + + return result, nil +} diff --git a/pkg/server/api.go b/pkg/server/api.go index 10634da..0901d34 100644 --- a/pkg/server/api.go +++ b/pkg/server/api.go @@ -3,6 +3,8 @@ package server import ( "net/http" + "github.com/rancher/naok/pkg/counts" + "github.com/gorilla/mux" "github.com/rancher/naok/pkg/accesscontrol" "github.com/rancher/naok/pkg/attributes" @@ -23,13 +25,17 @@ func newAPIServer(cfg *rest.Config, cf proxy.ClientGetter, as *accesscontrol.Acc ) a := &apiServer{ - Router: mux.NewRouter(), - cf: cf, - as: as, - sf: sf, - server: api.NewAPIServer(), + Router: mux.NewRouter(), + cf: cf, + as: as, + sf: sf, + server: api.NewAPIServer(), + baseSchemas: types.EmptySchemas(), } + counts.Register(a.baseSchemas) + subscribe.Register(a.baseSchemas) + a.Router.NotFoundHandler, err = k8sproxy.Handler("/", cfg) if err != nil { return nil, err @@ -42,10 +48,11 @@ func newAPIServer(cfg *rest.Config, cf proxy.ClientGetter, as *accesscontrol.Acc type apiServer struct { *mux.Router - cf proxy.ClientGetter - as *accesscontrol.AccessStore - sf schemas.SchemaFactory - server *api.Server + cf proxy.ClientGetter + as *accesscontrol.AccessStore + sf schemas.SchemaFactory + server *api.Server + baseSchemas *types.Schemas } func (a *apiServer) newSchemas() (*types.Schemas, error) { @@ -55,7 +62,7 @@ func (a *apiServer) newSchemas() (*types.Schemas, error) { } schemas.DefaultMapper = newDefaultMapper - subscribe.Register(schemas) + schemas.AddSchemas(a.baseSchemas) return schemas, nil } diff --git a/pkg/server/routes.go b/pkg/server/routes.go index fa18250..c0e2f85 100644 --- a/pkg/server/routes.go +++ b/pkg/server/routes.go @@ -15,6 +15,7 @@ func (a *apiServer) routes() error { a.Path("/v1/{type:schemas}").Handler(a.handle(nil)) a.Path("/v1/{type:schemas}/{name}").Handler(a.handle(nil)) a.Path("/v1/{type:subscribe}").Handler(a.handle(nil)) + a.Path("/v1/{type:counts}").Handler(a.handle(nil)) a.Path("/{version:v1}/{resource}").Handler(a.handle(a.k8sAPI)) a.Path("/{version:v1}/{resource}/{nameorns}").Handler(a.handle(a.k8sAPI))