steve/pkg/stores/partition/store.go
2021-07-23 23:45:13 -07:00

178 lines
4.3 KiB
Go

package partition
import (
"context"
"net/http"
"strconv"
"github.com/rancher/apiserver/pkg/types"
"golang.org/x/sync/errgroup"
)
type Partitioner interface {
Lookup(apiOp *types.APIRequest, schema *types.APISchema, verb, id string) (Partition, error)
All(apiOp *types.APIRequest, schema *types.APISchema, verb, id string) ([]Partition, error)
Store(apiOp *types.APIRequest, partition Partition) (types.Store, error)
}
type Store struct {
Partitioner Partitioner
}
func (s *Store) getStore(apiOp *types.APIRequest, schema *types.APISchema, verb, id string) (types.Store, error) {
p, err := s.Partitioner.Lookup(apiOp, schema, verb, id)
if err != nil {
return nil, err
}
return s.Partitioner.Store(apiOp, p)
}
func (s *Store) Delete(apiOp *types.APIRequest, schema *types.APISchema, id string) (types.APIObject, error) {
target, err := s.getStore(apiOp, schema, "delete", id)
if err != nil {
return types.APIObject{}, err
}
return target.Delete(apiOp, schema, id)
}
func (s *Store) ByID(apiOp *types.APIRequest, schema *types.APISchema, id string) (types.APIObject, error) {
target, err := s.getStore(apiOp, schema, "get", id)
if err != nil {
return types.APIObject{}, err
}
return target.ByID(apiOp, schema, id)
}
func (s *Store) listPartition(ctx context.Context, apiOp *types.APIRequest, schema *types.APISchema, partition Partition,
cont string, revision string, limit int) (types.APIObjectList, error) {
store, err := s.Partitioner.Store(apiOp, partition)
if err != nil {
return types.APIObjectList{}, err
}
req := apiOp.Clone()
req.Request = req.Request.Clone(ctx)
values := req.Request.URL.Query()
values.Set("continue", cont)
values.Set("revision", revision)
if limit > 0 {
values.Set("limit", strconv.Itoa(limit))
} else {
values.Del("limit")
}
req.Request.URL.RawQuery = values.Encode()
return store.List(req, schema)
}
func (s *Store) List(apiOp *types.APIRequest, schema *types.APISchema) (types.APIObjectList, error) {
var (
result types.APIObjectList
)
paritions, err := s.Partitioner.All(apiOp, schema, "list", "")
if err != nil {
return result, err
}
lister := ParallelPartitionLister{
Lister: func(ctx context.Context, partition Partition, cont string, revision string, limit int) (types.APIObjectList, error) {
return s.listPartition(ctx, apiOp, schema, partition, cont, revision, limit)
},
Concurrency: 3,
Partitions: paritions,
}
resume := apiOp.Request.URL.Query().Get("continue")
limit := getLimit(apiOp.Request)
list, err := lister.List(apiOp.Context(), limit, resume)
if err != nil {
return result, err
}
for items := range list {
result.Objects = append(result.Objects, items...)
}
result.Revision = lister.Revision()
result.Continue = lister.Continue()
return result, lister.Err()
}
func (s *Store) Create(apiOp *types.APIRequest, schema *types.APISchema, data types.APIObject) (types.APIObject, error) {
target, err := s.getStore(apiOp, schema, "create", "")
if err != nil {
return types.APIObject{}, err
}
return target.Create(apiOp, schema, data)
}
func (s *Store) Update(apiOp *types.APIRequest, schema *types.APISchema, data types.APIObject, id string) (types.APIObject, error) {
target, err := s.getStore(apiOp, schema, "update", id)
if err != nil {
return types.APIObject{}, err
}
return target.Update(apiOp, schema, data, id)
}
func (s *Store) Watch(apiOp *types.APIRequest, schema *types.APISchema, wr types.WatchRequest) (chan types.APIEvent, error) {
partitions, err := s.Partitioner.All(apiOp, schema, "watch", wr.ID)
if err != nil {
return nil, err
}
ctx, cancel := context.WithCancel(apiOp.Context())
apiOp = apiOp.Clone().WithContext(ctx)
eg := errgroup.Group{}
response := make(chan types.APIEvent)
for _, partition := range partitions {
store, err := s.Partitioner.Store(apiOp, partition)
if err != nil {
cancel()
return nil, err
}
eg.Go(func() error {
defer cancel()
c, err := store.Watch(apiOp, schema, wr)
if err != nil {
return err
}
for i := range c {
response <- i
}
return nil
})
}
go func() {
defer close(response)
<-ctx.Done()
eg.Wait()
cancel()
}()
return response, nil
}
func getLimit(req *http.Request) int {
limitString := req.URL.Query().Get("limit")
limit, err := strconv.Atoi(limitString)
if err != nil {
limit = 0
}
if limit <= 0 {
limit = 100000
}
return limit
}