mirror of
https://github.com/rancher/steve.git
synced 2025-09-01 15:37:31 +00:00
Cleanup counts
This commit is contained in:
268
pkg/resources/counts/counts.go
Normal file
268
pkg/resources/counts/counts.go
Normal file
@@ -0,0 +1,268 @@
|
||||
package counts
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/http"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/rancher/naok/pkg/attributes"
|
||||
|
||||
"github.com/rancher/naok/pkg/accesscontrol"
|
||||
"github.com/rancher/norman/pkg/store/empty"
|
||||
"github.com/rancher/norman/pkg/types"
|
||||
"github.com/sirupsen/logrus"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
var (
|
||||
ignore = map[string]bool{
|
||||
"count": true,
|
||||
"schema": true,
|
||||
"apiRoot": true,
|
||||
}
|
||||
slow = map[string]bool{
|
||||
"io.k8s.api.management.cattle.io.v3.CatalogTemplateVersion": true,
|
||||
"io.k8s.api.management.cattle.io.v3.CatalogTemplate": true,
|
||||
}
|
||||
listTimeout = 1750 * time.Millisecond
|
||||
)
|
||||
|
||||
func Register(schemas *types.Schemas) {
|
||||
schemas.MustImportAndCustomize(Count{}, func(schema *types.Schema) {
|
||||
schema.CollectionMethods = []string{http.MethodGet}
|
||||
schema.ResourceMethods = []string{http.MethodGet}
|
||||
schema.Attributes["access"] = accesscontrol.AccessListMap{
|
||||
"watch": accesscontrol.AccessList{
|
||||
{
|
||||
Namespace: "*",
|
||||
ResourceName: "*",
|
||||
},
|
||||
},
|
||||
}
|
||||
schema.Store = &Store{}
|
||||
})
|
||||
}
|
||||
|
||||
type Count struct {
|
||||
ID string `json:"id,omitempty"`
|
||||
Counts map[string]ItemCount `json:"counts,omitempty"`
|
||||
}
|
||||
|
||||
type ItemCount struct {
|
||||
Count int `json:"count,omitempty"`
|
||||
Namespaces map[string]int `json:"namespaces,omitempty"`
|
||||
Revision string `json:"revision,omitempty"`
|
||||
}
|
||||
|
||||
type Store struct {
|
||||
empty.Store
|
||||
}
|
||||
|
||||
func (s *Store) ByID(apiOp *types.APIRequest, schema *types.Schema, id string) (types.APIObject, error) {
|
||||
c, err := s.getCount(apiOp, listTimeout, true)
|
||||
return types.ToAPI(c), err
|
||||
}
|
||||
|
||||
func (s *Store) List(apiOp *types.APIRequest, schema *types.Schema, opt *types.QueryOptions) (types.APIObject, error) {
|
||||
c, err := s.getCount(apiOp, listTimeout, true)
|
||||
return types.ToAPI([]interface{}{c}), err
|
||||
}
|
||||
|
||||
func (s *Store) Watch(apiOp *types.APIRequest, schema *types.Schema, w types.WatchRequest) (chan types.APIEvent, error) {
|
||||
c, err := s.getCount(apiOp, listTimeout*10, false)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
wg := sync.WaitGroup{}
|
||||
ctx, cancel := context.WithCancel(apiOp.Context())
|
||||
|
||||
child := make(chan Count)
|
||||
for name, countItem := range c.Counts {
|
||||
wg.Add(1)
|
||||
name := name
|
||||
countItem := countItem
|
||||
go func() {
|
||||
s.watchItem(apiOp.WithContext(ctx), name, countItem, cancel, child)
|
||||
wg.Done()
|
||||
}()
|
||||
}
|
||||
|
||||
go func() {
|
||||
wg.Wait()
|
||||
close(child)
|
||||
}()
|
||||
|
||||
result := make(chan types.APIEvent)
|
||||
go func() {
|
||||
defer close(result)
|
||||
|
||||
result <- types.APIEvent{
|
||||
Name: "resource.create",
|
||||
ResourceType: "count",
|
||||
Object: types.ToAPI(c),
|
||||
}
|
||||
|
||||
for change := range child {
|
||||
for k, v := range change.Counts {
|
||||
c.Counts[k] = v
|
||||
}
|
||||
|
||||
result <- types.APIEvent{
|
||||
Name: "resource.change",
|
||||
ResourceType: "count",
|
||||
Object: types.ToAPI(c),
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (s *Store) watchItem(apiOp *types.APIRequest, schemaID string, start ItemCount, cancel func(), counts chan Count) {
|
||||
schema := apiOp.Schemas.Schema(schemaID)
|
||||
if schema == nil || schema.Store == nil || apiOp.AccessControl.CanWatch(apiOp, schema) != nil {
|
||||
return
|
||||
}
|
||||
defer cancel()
|
||||
|
||||
logrus.Debugf("watching %s for count", schemaID)
|
||||
defer logrus.Debugf("close watching %s for count", schemaID)
|
||||
w, err := schema.Store.Watch(apiOp, schema, types.WatchRequest{Revision: start.Revision})
|
||||
if err != nil {
|
||||
logrus.Errorf("failed to watch %s for counts: %v", schema.ID, err)
|
||||
return
|
||||
}
|
||||
|
||||
for event := range w {
|
||||
if event.Revision == start.Revision {
|
||||
continue
|
||||
}
|
||||
|
||||
ns := types.Namespace(event.Object.Map())
|
||||
write := false
|
||||
if event.Name == "resource.create" {
|
||||
start.Count++
|
||||
write = true
|
||||
if ns != "" {
|
||||
start.Namespaces[ns]++
|
||||
}
|
||||
} else if event.Name == "resource.remove" {
|
||||
start.Count--
|
||||
write = true
|
||||
if ns != "" {
|
||||
start.Namespaces[ns]--
|
||||
}
|
||||
}
|
||||
if write {
|
||||
counts <- Count{Counts: map[string]ItemCount{
|
||||
schemaID: start,
|
||||
}}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Store) getCount(apiOp *types.APIRequest, timeout time.Duration, ignoreSlow bool) (Count, error) {
|
||||
var countLock sync.Mutex
|
||||
counts := map[string]ItemCount{}
|
||||
|
||||
errCtx, cancel := context.WithTimeout(apiOp.Context(), timeout)
|
||||
eg, errCtx := errgroup.WithContext(errCtx)
|
||||
defer cancel()
|
||||
|
||||
for _, schema := range apiOp.Schemas.Schemas() {
|
||||
if ignore[schema.ID] {
|
||||
continue
|
||||
}
|
||||
|
||||
if attributes.PreferredVersion(schema) != "" {
|
||||
continue
|
||||
}
|
||||
|
||||
if attributes.PreferredGroup(schema) != "" {
|
||||
continue
|
||||
}
|
||||
|
||||
if ignoreSlow && slow[schema.ID] {
|
||||
continue
|
||||
}
|
||||
|
||||
if schema.Store == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
if apiOp.AccessControl.CanList(apiOp, schema) != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
current := schema
|
||||
eg.Go(func() error {
|
||||
list, err := current.Store.List(apiOp, current, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if list.IsNil() {
|
||||
return nil
|
||||
}
|
||||
|
||||
itemCount := ItemCount{
|
||||
Namespaces: map[string]int{},
|
||||
Revision: list.ListRevision,
|
||||
}
|
||||
|
||||
for _, item := range list.List() {
|
||||
itemCount.Count++
|
||||
ns := types.Namespace(item)
|
||||
if ns != "" {
|
||||
itemCount.Namespaces[ns]++
|
||||
}
|
||||
}
|
||||
|
||||
countLock.Lock()
|
||||
counts[current.ID] = itemCount
|
||||
countLock.Unlock()
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
var (
|
||||
err error
|
||||
)
|
||||
|
||||
select {
|
||||
case err = <-future(eg.Wait):
|
||||
case <-errCtx.Done():
|
||||
err = errCtx.Err()
|
||||
}
|
||||
|
||||
if err != nil && err != context.Canceled && err != context.DeadlineExceeded {
|
||||
return Count{}, err
|
||||
}
|
||||
|
||||
// in the case of cancellation go routines could still be running so we copy the map
|
||||
// to avoid returning a map that might get modified
|
||||
countLock.Lock()
|
||||
result := Count{
|
||||
ID: "count",
|
||||
Counts: map[string]ItemCount{},
|
||||
}
|
||||
for k, v := range counts {
|
||||
result.Counts[k] = v
|
||||
}
|
||||
countLock.Unlock()
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func future(f func() error) chan error {
|
||||
result := make(chan error, 1)
|
||||
go func() {
|
||||
defer close(result)
|
||||
if err := f(); err != nil {
|
||||
result <- err
|
||||
}
|
||||
}()
|
||||
return result
|
||||
}
|
Reference in New Issue
Block a user