docs: Add docs for partition and proxy store

This commit is contained in:
Colleen Murphy
2022-10-14 16:17:32 -07:00
parent ea61c2187a
commit b16d502fc7
6 changed files with 111 additions and 16 deletions

View File

@@ -10,29 +10,42 @@ import (
"golang.org/x/sync/semaphore"
)
// Partition represents a named grouping of kubernetes resources,
// such as by namespace or a set of names.
type Partition interface {
Name() string
}
// ParallelPartitionLister defines how a set of partitions will be queried.
type ParallelPartitionLister struct {
Lister PartitionLister
// Lister is the lister method for a single partition.
Lister PartitionLister
// Concurrency is the weight of the semaphore.
Concurrency int64
Partitions []Partition
state *listState
revision string
err error
// Partitions is the set of partitions that will be concurrently queried.
Partitions []Partition
state *listState
revision string
err error
}
// PartitionLister lists objects for one partition.
type PartitionLister func(ctx context.Context, partition Partition, cont string, revision string, limit int) (types.APIObjectList, error)
// Err returns the latest error encountered.
func (p *ParallelPartitionLister) Err() error {
return p.err
}
// Revision returns the revision for the current list state.
func (p *ParallelPartitionLister) Revision() string {
return p.revision
}
// Continue returns the encoded continue token based on the current list state.
func (p *ParallelPartitionLister) Continue() string {
if p.state == nil {
return ""
@@ -56,6 +69,9 @@ func indexOrZero(partitions []Partition, name string) int {
return 0
}
// List returns a stream of objects up to the requested limit.
// If the continue token is not empty, it decodes it and returns the stream
// starting at the indicated marker.
func (p *ParallelPartitionLister) List(ctx context.Context, limit int, resume string) (<-chan []types.APIObject, error) {
var state listState
if resume != "" {
@@ -77,14 +93,33 @@ func (p *ParallelPartitionLister) List(ctx context.Context, limit int, resume st
return result, nil
}
// listState is a representation of the continuation point for a partial list.
// It is encoded as the continue token in the returned response.
type listState struct {
Revision string `json:"r,omitempty"`
// Revision is the resourceVersion for the List object.
Revision string `json:"r,omitempty"`
// PartitionName is the name of the partition.
PartitionName string `json:"p,omitempty"`
Continue string `json:"c,omitempty"`
Offset int `json:"o,omitempty"`
Limit int `json:"l,omitempty"`
// Continue is the continue token returned from Kubernetes for a partially filled list request.
// It is a subfield of the continue token returned from steve.
Continue string `json:"c,omitempty"`
// Offset is the offset from the start of the list within the partition to begin the result list.
Offset int `json:"o,omitempty"`
// Limit is the maximum number of items from all partitions to return in the result.
Limit int `json:"l,omitempty"`
}
// feeder spawns a goroutine to list resources in each partition and feeds the
// results, in order by partition index, into a channel.
// If the sum of the results from all partitions (by namespaces or names) is
// greater than the limit parameter from the user request or the default of
// 100000, the result is truncated and a continue token is generated that
// indicates the partition and offset for the client to start on in the next
// request.
func (p *ParallelPartitionLister) feeder(ctx context.Context, state listState, limit int, result chan []types.APIObject) {
var (
sem = semaphore.NewWeighted(p.Concurrency)
@@ -116,6 +151,7 @@ func (p *ParallelPartitionLister) feeder(ctx context.Context, state listState, l
// setup a linked list of channel to control insertion order
last = next
// state.Revision is decoded from the continue token, there won't be a revision on the first request.
if state.Revision == "" {
// don't have a revision yet so grab all tickets to set a revision
tickets = 3
@@ -125,7 +161,7 @@ func (p *ParallelPartitionLister) feeder(ctx context.Context, state listState, l
break
}
// make state local
// make state local for this partition
state := state
eg.Go(func() error {
defer sem.Release(tickets)
@@ -154,10 +190,13 @@ func (p *ParallelPartitionLister) feeder(ctx context.Context, state listState, l
p.revision = list.Revision
}
// We have already seen the first objects in the list, truncate up to the offset.
if state.PartitionName == partition.Name() && state.Offset > 0 && state.Offset < len(list.Objects) {
list.Objects = list.Objects[state.Offset:]
}
// Case 1: the capacity has been reached across all goroutines but the list is still only partial,
// so save the state so that the next page can be requested later.
if len(list.Objects) > capacity {
result <- list.Objects[:capacity]
// save state to redo this list at this offset
@@ -173,10 +212,13 @@ func (p *ParallelPartitionLister) feeder(ctx context.Context, state listState, l
}
result <- list.Objects
capacity -= len(list.Objects)
// Case 2: all objects have been returned, we are done.
if list.Continue == "" {
return nil
}
// loop again and get more data
// Case 3: we started at an offset and truncated the list to skip the objects up to the offset.
// We're not yet up to capacity and have not retrieved every object,
// so loop again and get more data.
state.Continue = list.Continue
state.PartitionName = partition.Name()
state.Offset = 0

View File

@@ -9,12 +9,16 @@ import (
"golang.org/x/sync/errgroup"
)
const defaultLimit = 100000
// Partitioner is an interface for interacting with partitions.
type Partitioner interface {
Lookup(apiOp *types.APIRequest, schema *types.APISchema, verb, id string) (Partition, error)
All(apiOp *types.APIRequest, schema *types.APISchema, verb, id string) ([]Partition, error)
Store(apiOp *types.APIRequest, partition Partition) (types.Store, error)
}
// Store implements types.Store for partitions.
type Store struct {
Partitioner Partitioner
}
@@ -28,6 +32,7 @@ func (s *Store) getStore(apiOp *types.APIRequest, schema *types.APISchema, verb,
return s.Partitioner.Store(apiOp, p)
}
// Delete deletes an object from a store.
func (s *Store) Delete(apiOp *types.APIRequest, schema *types.APISchema, id string) (types.APIObject, error) {
target, err := s.getStore(apiOp, schema, "delete", id)
if err != nil {
@@ -37,6 +42,7 @@ func (s *Store) Delete(apiOp *types.APIRequest, schema *types.APISchema, id stri
return target.Delete(apiOp, schema, id)
}
// ByID looks up a single object by its ID.
func (s *Store) ByID(apiOp *types.APIRequest, schema *types.APISchema, id string) (types.APIObject, error) {
target, err := s.getStore(apiOp, schema, "get", id)
if err != nil {
@@ -69,12 +75,14 @@ func (s *Store) listPartition(ctx context.Context, apiOp *types.APIRequest, sche
return store.List(req, schema)
}
// List returns a list of objects across all applicable partitions.
// If pagination parameters are used, it returns a segment of the list.
func (s *Store) List(apiOp *types.APIRequest, schema *types.APISchema) (types.APIObjectList, error) {
var (
result types.APIObjectList
)
paritions, err := s.Partitioner.All(apiOp, schema, "list", "")
partitions, err := s.Partitioner.All(apiOp, schema, "list", "")
if err != nil {
return result, err
}
@@ -84,7 +92,7 @@ func (s *Store) List(apiOp *types.APIRequest, schema *types.APISchema) (types.AP
return s.listPartition(ctx, apiOp, schema, partition, cont, revision, limit)
},
Concurrency: 3,
Partitions: paritions,
Partitions: partitions,
}
resume := apiOp.Request.URL.Query().Get("continue")
@@ -104,6 +112,7 @@ func (s *Store) List(apiOp *types.APIRequest, schema *types.APISchema) (types.AP
return result, lister.Err()
}
// Create creates a single object in the store.
func (s *Store) Create(apiOp *types.APIRequest, schema *types.APISchema, data types.APIObject) (types.APIObject, error) {
target, err := s.getStore(apiOp, schema, "create", "")
if err != nil {
@@ -113,6 +122,7 @@ func (s *Store) Create(apiOp *types.APIRequest, schema *types.APISchema, data ty
return target.Create(apiOp, schema, data)
}
// Update updates a single object in the store.
func (s *Store) Update(apiOp *types.APIRequest, schema *types.APISchema, data types.APIObject, id string) (types.APIObject, error) {
target, err := s.getStore(apiOp, schema, "update", id)
if err != nil {
@@ -122,6 +132,7 @@ func (s *Store) Update(apiOp *types.APIRequest, schema *types.APISchema, data ty
return target.Update(apiOp, schema, data, id)
}
// Watch returns a channel of events for a list or resource.
func (s *Store) Watch(apiOp *types.APIRequest, schema *types.APISchema, wr types.WatchRequest) (chan types.APIEvent, error) {
partitions, err := s.Partitioner.All(apiOp, schema, "watch", wr.ID)
if err != nil {
@@ -164,6 +175,9 @@ func (s *Store) Watch(apiOp *types.APIRequest, schema *types.APISchema, wr types
return response, nil
}
// getLimit extracts the limit parameter from the request or sets a default of 100000.
// Since a default is always set, this implies that clients must always be
// aware that the list may be incomplete.
func getLimit(req *http.Request) int {
limitString := req.URL.Query().Get("limit")
limit, err := strconv.Atoi(limitString)
@@ -171,7 +185,7 @@ func getLimit(req *http.Request) int {
limit = 0
}
if limit <= 0 {
limit = 100000
limit = defaultLimit
}
return limit
}