1
0
mirror of https://github.com/rancher/steve.git synced 2025-05-03 13:36:42 +00:00
steve/pkg/resources/counts/counts.go

269 lines
5.7 KiB
Go
Raw Normal View History

2019-08-12 22:15:19 +00:00
package counts
import (
"context"
"net/http"
"sync"
"time"
2019-08-16 18:40:42 +00:00
"github.com/rancher/naok/pkg/attributes"
2019-08-12 23:47:23 +00:00
"github.com/rancher/naok/pkg/accesscontrol"
2019-08-12 22:15:19 +00:00
"github.com/rancher/norman/pkg/store/empty"
"github.com/rancher/norman/pkg/types"
2019-08-12 23:47:23 +00:00
"github.com/sirupsen/logrus"
2019-08-12 22:15:19 +00:00
"golang.org/x/sync/errgroup"
)
var (
ignore = map[string]bool{
2019-08-13 04:32:57 +00:00
"count": true,
"schema": true,
"apiRoot": true,
2019-08-12 22:15:19 +00:00
}
2019-08-12 23:47:23 +00:00
slow = map[string]bool{
"io.k8s.api.management.cattle.io.v3.CatalogTemplateVersion": true,
"io.k8s.api.management.cattle.io.v3.CatalogTemplate": true,
}
listTimeout = 1750 * time.Millisecond
2019-08-12 22:15:19 +00:00
)
func Register(schemas *types.Schemas) {
schemas.MustImportAndCustomize(Count{}, func(schema *types.Schema) {
schema.CollectionMethods = []string{http.MethodGet}
2019-08-13 04:32:57 +00:00
schema.ResourceMethods = []string{http.MethodGet}
2019-08-12 23:47:23 +00:00
schema.Attributes["access"] = accesscontrol.AccessListMap{
"watch": accesscontrol.AccessList{
{
Namespace: "*",
ResourceName: "*",
},
},
}
2019-08-12 22:15:19 +00:00
schema.Store = &Store{}
})
}
type Count struct {
2019-08-12 23:47:23 +00:00
ID string `json:"id,omitempty"`
2019-08-12 22:15:19 +00:00
Counts map[string]ItemCount `json:"counts,omitempty"`
}
type ItemCount struct {
2019-08-13 04:32:57 +00:00
Count int `json:"count,omitempty"`
Namespaces map[string]int `json:"namespaces,omitempty"`
Revision string `json:"revision,omitempty"`
2019-08-12 22:15:19 +00:00
}
type Store struct {
empty.Store
}
func (s *Store) ByID(apiOp *types.APIRequest, schema *types.Schema, id string) (types.APIObject, error) {
2019-08-12 23:47:23 +00:00
c, err := s.getCount(apiOp, listTimeout, true)
2019-08-12 22:15:19 +00:00
return types.ToAPI(c), err
}
func (s *Store) List(apiOp *types.APIRequest, schema *types.Schema, opt *types.QueryOptions) (types.APIObject, error) {
2019-08-12 23:47:23 +00:00
c, err := s.getCount(apiOp, listTimeout, true)
2019-08-12 22:15:19 +00:00
return types.ToAPI([]interface{}{c}), err
}
func (s *Store) Watch(apiOp *types.APIRequest, schema *types.Schema, w types.WatchRequest) (chan types.APIEvent, error) {
2019-08-12 23:47:23 +00:00
c, err := s.getCount(apiOp, listTimeout*10, false)
if err != nil {
return nil, err
}
wg := sync.WaitGroup{}
ctx, cancel := context.WithCancel(apiOp.Context())
child := make(chan Count)
for name, countItem := range c.Counts {
wg.Add(1)
2019-08-16 18:40:42 +00:00
name := name
countItem := countItem
2019-08-12 23:47:23 +00:00
go func() {
2019-08-13 04:32:57 +00:00
s.watchItem(apiOp.WithContext(ctx), name, countItem, cancel, child)
2019-08-12 23:47:23 +00:00
wg.Done()
}()
}
go func() {
wg.Wait()
close(child)
}()
result := make(chan types.APIEvent)
go func() {
defer close(result)
result <- types.APIEvent{
Name: "resource.create",
ResourceType: "count",
Object: types.ToAPI(c),
}
for change := range child {
for k, v := range change.Counts {
c.Counts[k] = v
}
result <- types.APIEvent{
Name: "resource.change",
ResourceType: "count",
Object: types.ToAPI(c),
}
}
}()
return result, nil
}
2019-08-13 04:32:57 +00:00
func (s *Store) watchItem(apiOp *types.APIRequest, schemaID string, start ItemCount, cancel func(), counts chan Count) {
2019-08-12 23:47:23 +00:00
schema := apiOp.Schemas.Schema(schemaID)
if schema == nil || schema.Store == nil || apiOp.AccessControl.CanWatch(apiOp, schema) != nil {
return
}
defer cancel()
2019-08-16 18:40:42 +00:00
logrus.Debugf("watching %s for count", schemaID)
defer logrus.Debugf("close watching %s for count", schemaID)
2019-08-13 04:32:57 +00:00
w, err := schema.Store.Watch(apiOp, schema, types.WatchRequest{Revision: start.Revision})
2019-08-12 23:47:23 +00:00
if err != nil {
logrus.Errorf("failed to watch %s for counts: %v", schema.ID, err)
return
}
for event := range w {
2019-08-13 04:32:57 +00:00
if event.Revision == start.Revision {
2019-08-12 23:47:23 +00:00
continue
}
2019-08-13 04:32:57 +00:00
ns := types.Namespace(event.Object.Map())
2019-08-12 23:47:23 +00:00
write := false
if event.Name == "resource.create" {
2019-08-13 04:32:57 +00:00
start.Count++
2019-08-12 23:47:23 +00:00
write = true
2019-08-13 04:32:57 +00:00
if ns != "" {
start.Namespaces[ns]++
}
2019-08-12 23:47:23 +00:00
} else if event.Name == "resource.remove" {
2019-08-13 04:32:57 +00:00
start.Count--
2019-08-12 23:47:23 +00:00
write = true
2019-08-13 04:32:57 +00:00
if ns != "" {
start.Namespaces[ns]--
}
2019-08-12 23:47:23 +00:00
}
if write {
counts <- Count{Counts: map[string]ItemCount{
2019-08-13 04:32:57 +00:00
schemaID: start,
2019-08-12 23:47:23 +00:00
}}
}
}
2019-08-12 22:15:19 +00:00
}
2019-08-12 23:47:23 +00:00
func (s *Store) getCount(apiOp *types.APIRequest, timeout time.Duration, ignoreSlow bool) (Count, error) {
2019-08-12 22:15:19 +00:00
var countLock sync.Mutex
counts := map[string]ItemCount{}
errCtx, cancel := context.WithTimeout(apiOp.Context(), timeout)
eg, errCtx := errgroup.WithContext(errCtx)
defer cancel()
for _, schema := range apiOp.Schemas.Schemas() {
if ignore[schema.ID] {
continue
}
2019-08-16 18:40:42 +00:00
if attributes.PreferredVersion(schema) != "" {
continue
}
if attributes.PreferredGroup(schema) != "" {
continue
}
2019-08-12 23:47:23 +00:00
if ignoreSlow && slow[schema.ID] {
continue
}
2019-08-12 22:15:19 +00:00
if schema.Store == nil {
continue
}
if apiOp.AccessControl.CanList(apiOp, schema) != nil {
continue
}
current := schema
eg.Go(func() error {
list, err := current.Store.List(apiOp, current, nil)
if err != nil {
return err
}
if list.IsNil() {
return nil
}
2019-08-13 04:32:57 +00:00
itemCount := ItemCount{
Namespaces: map[string]int{},
Revision: list.ListRevision,
2019-08-12 22:15:19 +00:00
}
2019-08-13 04:32:57 +00:00
for _, item := range list.List() {
itemCount.Count++
ns := types.Namespace(item)
if ns != "" {
itemCount.Namespaces[ns]++
}
}
countLock.Lock()
counts[current.ID] = itemCount
2019-08-12 22:15:19 +00:00
countLock.Unlock()
return nil
})
}
2019-08-12 23:47:23 +00:00
var (
err error
)
select {
case err = <-future(eg.Wait):
case <-errCtx.Done():
err = errCtx.Err()
}
if err != nil && err != context.Canceled && err != context.DeadlineExceeded {
2019-08-12 22:15:19 +00:00
return Count{}, err
}
// in the case of cancellation go routines could still be running so we copy the map
// to avoid returning a map that might get modified
countLock.Lock()
result := Count{
2019-08-12 23:47:23 +00:00
ID: "count",
2019-08-12 22:15:19 +00:00
Counts: map[string]ItemCount{},
}
for k, v := range counts {
result.Counts[k] = v
}
countLock.Unlock()
return result, nil
}
2019-08-12 23:47:23 +00:00
func future(f func() error) chan error {
result := make(chan error, 1)
go func() {
defer close(result)
if err := f(); err != nil {
result <- err
}
}()
return result
}