mirror of
https://github.com/niusmallnan/steve.git
synced 2025-09-01 05:09:28 +00:00
Watch counts
This commit is contained in:
@@ -146,6 +146,9 @@ func (a Access) nameOK(name string) bool {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func GetAccessListMap(s *types.Schema) AccessListMap {
|
func GetAccessListMap(s *types.Schema) AccessListMap {
|
||||||
|
if s == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
v, _ := attributes.Access(s).(AccessListMap)
|
v, _ := attributes.Access(s).(AccessListMap)
|
||||||
return v
|
return v
|
||||||
}
|
}
|
||||||
|
@@ -6,8 +6,10 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/rancher/naok/pkg/accesscontrol"
|
||||||
"github.com/rancher/norman/pkg/store/empty"
|
"github.com/rancher/norman/pkg/store/empty"
|
||||||
"github.com/rancher/norman/pkg/types"
|
"github.com/rancher/norman/pkg/types"
|
||||||
|
"github.com/sirupsen/logrus"
|
||||||
"golang.org/x/sync/errgroup"
|
"golang.org/x/sync/errgroup"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -16,17 +18,31 @@ var (
|
|||||||
"count": true,
|
"count": true,
|
||||||
"schema": true,
|
"schema": true,
|
||||||
}
|
}
|
||||||
|
slow = map[string]bool{
|
||||||
|
"io.k8s.api.management.cattle.io.v3.CatalogTemplateVersion": true,
|
||||||
|
"io.k8s.api.management.cattle.io.v3.CatalogTemplate": true,
|
||||||
|
}
|
||||||
|
listTimeout = 1750 * time.Millisecond
|
||||||
)
|
)
|
||||||
|
|
||||||
func Register(schemas *types.Schemas) {
|
func Register(schemas *types.Schemas) {
|
||||||
schemas.MustImportAndCustomize(Count{}, func(schema *types.Schema) {
|
schemas.MustImportAndCustomize(Count{}, func(schema *types.Schema) {
|
||||||
schema.CollectionMethods = []string{http.MethodGet}
|
schema.CollectionMethods = []string{http.MethodGet}
|
||||||
schema.ResourceMethods = []string{}
|
schema.ResourceMethods = []string{}
|
||||||
|
schema.Attributes["access"] = accesscontrol.AccessListMap{
|
||||||
|
"watch": accesscontrol.AccessList{
|
||||||
|
{
|
||||||
|
Namespace: "*",
|
||||||
|
ResourceName: "*",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
schema.Store = &Store{}
|
schema.Store = &Store{}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
type Count struct {
|
type Count struct {
|
||||||
|
ID string `json:"id,omitempty"`
|
||||||
Counts map[string]ItemCount `json:"counts,omitempty"`
|
Counts map[string]ItemCount `json:"counts,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -40,20 +56,103 @@ type Store struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *Store) ByID(apiOp *types.APIRequest, schema *types.Schema, id string) (types.APIObject, error) {
|
func (s *Store) ByID(apiOp *types.APIRequest, schema *types.Schema, id string) (types.APIObject, error) {
|
||||||
c, err := s.getCount(apiOp, 750*time.Millisecond)
|
c, err := s.getCount(apiOp, listTimeout, true)
|
||||||
return types.ToAPI(c), err
|
return types.ToAPI(c), err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Store) List(apiOp *types.APIRequest, schema *types.Schema, opt *types.QueryOptions) (types.APIObject, error) {
|
func (s *Store) List(apiOp *types.APIRequest, schema *types.Schema, opt *types.QueryOptions) (types.APIObject, error) {
|
||||||
c, err := s.getCount(apiOp, 750*time.Millisecond)
|
c, err := s.getCount(apiOp, listTimeout, true)
|
||||||
return types.ToAPI([]interface{}{c}), err
|
return types.ToAPI([]interface{}{c}), err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Store) Watch(apiOp *types.APIRequest, schema *types.Schema, w types.WatchRequest) (chan types.APIEvent, error) {
|
func (s *Store) Watch(apiOp *types.APIRequest, schema *types.Schema, w types.WatchRequest) (chan types.APIEvent, error) {
|
||||||
return nil, nil
|
c, err := s.getCount(apiOp, listTimeout*10, false)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
wg := sync.WaitGroup{}
|
||||||
|
ctx, cancel := context.WithCancel(apiOp.Context())
|
||||||
|
|
||||||
|
child := make(chan Count)
|
||||||
|
for name, countItem := range c.Counts {
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
s.watchItem(apiOp.WithContext(ctx), name, countItem.Count, countItem.Revision, cancel, child)
|
||||||
|
wg.Done()
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
wg.Wait()
|
||||||
|
close(child)
|
||||||
|
}()
|
||||||
|
|
||||||
|
result := make(chan types.APIEvent)
|
||||||
|
go func() {
|
||||||
|
defer close(result)
|
||||||
|
|
||||||
|
result <- types.APIEvent{
|
||||||
|
Name: "resource.create",
|
||||||
|
ResourceType: "count",
|
||||||
|
Object: types.ToAPI(c),
|
||||||
|
}
|
||||||
|
|
||||||
|
for change := range child {
|
||||||
|
for k, v := range change.Counts {
|
||||||
|
c.Counts[k] = v
|
||||||
|
}
|
||||||
|
|
||||||
|
result <- types.APIEvent{
|
||||||
|
Name: "resource.change",
|
||||||
|
ResourceType: "count",
|
||||||
|
Object: types.ToAPI(c),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Store) getCount(apiOp *types.APIRequest, timeout time.Duration) (Count, error) {
|
func (s *Store) watchItem(apiOp *types.APIRequest, schemaID string, start int, revision string, cancel func(), counts chan Count) {
|
||||||
|
schema := apiOp.Schemas.Schema(schemaID)
|
||||||
|
if schema == nil || schema.Store == nil || apiOp.AccessControl.CanWatch(apiOp, schema) != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
w, err := schema.Store.Watch(apiOp, schema, types.WatchRequest{Revision: revision})
|
||||||
|
if err != nil {
|
||||||
|
logrus.Errorf("failed to watch %s for counts: %v", schema.ID, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
for event := range w {
|
||||||
|
if event.Revision == revision {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
write := false
|
||||||
|
if event.Name == "resource.create" {
|
||||||
|
start++
|
||||||
|
write = true
|
||||||
|
} else if event.Name == "resource.remove" {
|
||||||
|
start--
|
||||||
|
write = true
|
||||||
|
}
|
||||||
|
if write {
|
||||||
|
counts <- Count{Counts: map[string]ItemCount{
|
||||||
|
schemaID: {
|
||||||
|
Count: start,
|
||||||
|
Revision: event.Revision,
|
||||||
|
},
|
||||||
|
}}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Store) getCount(apiOp *types.APIRequest, timeout time.Duration, ignoreSlow bool) (Count, error) {
|
||||||
var countLock sync.Mutex
|
var countLock sync.Mutex
|
||||||
counts := map[string]ItemCount{}
|
counts := map[string]ItemCount{}
|
||||||
|
|
||||||
@@ -66,6 +165,10 @@ func (s *Store) getCount(apiOp *types.APIRequest, timeout time.Duration) (Count,
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if ignoreSlow && slow[schema.ID] {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
if schema.Store == nil {
|
if schema.Store == nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@@ -95,7 +198,17 @@ func (s *Store) getCount(apiOp *types.APIRequest, timeout time.Duration) (Count,
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := eg.Wait(); err != nil && err != context.Canceled {
|
var (
|
||||||
|
err error
|
||||||
|
)
|
||||||
|
|
||||||
|
select {
|
||||||
|
case err = <-future(eg.Wait):
|
||||||
|
case <-errCtx.Done():
|
||||||
|
err = errCtx.Err()
|
||||||
|
}
|
||||||
|
|
||||||
|
if err != nil && err != context.Canceled && err != context.DeadlineExceeded {
|
||||||
return Count{}, err
|
return Count{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -103,6 +216,7 @@ func (s *Store) getCount(apiOp *types.APIRequest, timeout time.Duration) (Count,
|
|||||||
// to avoid returning a map that might get modified
|
// to avoid returning a map that might get modified
|
||||||
countLock.Lock()
|
countLock.Lock()
|
||||||
result := Count{
|
result := Count{
|
||||||
|
ID: "count",
|
||||||
Counts: map[string]ItemCount{},
|
Counts: map[string]ItemCount{},
|
||||||
}
|
}
|
||||||
for k, v := range counts {
|
for k, v := range counts {
|
||||||
@@ -112,3 +226,14 @@ func (s *Store) getCount(apiOp *types.APIRequest, timeout time.Duration) (Count,
|
|||||||
|
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func future(f func() error) chan error {
|
||||||
|
result := make(chan error, 1)
|
||||||
|
go func() {
|
||||||
|
defer close(result)
|
||||||
|
if err := f(); err != nil {
|
||||||
|
result <- err
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
Reference in New Issue
Block a user