1
0
mirror of https://github.com/rancher/steve.git synced 2025-05-03 13:36:42 +00:00
steve/pkg/stores/partition/parallel.go

254 lines
6.8 KiB
Go
Raw Normal View History

2020-06-05 20:30:33 +00:00
package partition
2020-02-10 17:18:20 +00:00
import (
"context"
"encoding/base64"
"encoding/json"
"github.com/rancher/apiserver/pkg/types"
2020-02-10 17:18:20 +00:00
"golang.org/x/sync/errgroup"
"golang.org/x/sync/semaphore"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
2020-02-10 17:18:20 +00:00
)
// Partition represents a named grouping of kubernetes resources,
// such as by namespace or a set of names.
2020-06-05 20:30:33 +00:00
type Partition interface {
Name() string
}
// ParallelPartitionLister defines how a set of partitions will be queried.
2020-02-10 17:18:20 +00:00
type ParallelPartitionLister struct {
// Lister is the lister method for a single partition.
Lister PartitionLister
// Concurrency is the weight of the semaphore.
2020-02-10 17:18:20 +00:00
Concurrency int64
// Partitions is the set of partitions that will be concurrently queried.
Partitions []Partition
state *listState
revision string
err error
2020-02-10 17:18:20 +00:00
}
// PartitionLister lists objects for one partition.
type PartitionLister func(ctx context.Context, partition Partition, cont string, revision string, limit int) (*unstructured.UnstructuredList, []types.Warning, error)
2020-02-10 17:18:20 +00:00
// Err returns the latest error encountered.
2020-02-10 17:18:20 +00:00
func (p *ParallelPartitionLister) Err() error {
return p.err
}
// Revision returns the revision for the current list state.
2020-02-10 17:18:20 +00:00
func (p *ParallelPartitionLister) Revision() string {
return p.revision
}
// Continue returns the encoded continue token based on the current list state.
2020-02-10 17:18:20 +00:00
func (p *ParallelPartitionLister) Continue() string {
if p.state == nil {
return ""
}
bytes, err := json.Marshal(p.state)
if err != nil {
return ""
}
return base64.StdEncoding.EncodeToString(bytes)
}
2020-06-05 20:30:33 +00:00
func indexOrZero(partitions []Partition, name string) int {
if name == "" {
2020-02-10 17:18:20 +00:00
return 0
}
for i, partition := range partitions {
2020-06-05 20:30:33 +00:00
if partition.Name() == name {
2020-02-10 17:18:20 +00:00
return i
}
}
return 0
}
// List returns a stream of objects up to the requested limit.
// If the continue token is not empty, it decodes it and returns the stream
// starting at the indicated marker.
func (p *ParallelPartitionLister) List(ctx context.Context, limit int, resume, revision string) (<-chan []unstructured.Unstructured, error) {
2020-02-10 17:18:20 +00:00
var state listState
if resume != "" {
bytes, err := base64.StdEncoding.DecodeString(resume)
if err != nil {
return nil, err
}
if err := json.Unmarshal(bytes, &state); err != nil {
return nil, err
}
if state.Limit > 0 {
limit = state.Limit
}
} else {
state.Revision = revision
2020-02-10 17:18:20 +00:00
}
result := make(chan []unstructured.Unstructured)
2020-02-10 17:18:20 +00:00
go p.feeder(ctx, state, limit, result)
return result, nil
}
// listState is a representation of the continuation point for a partial list.
// It is encoded as the continue token in the returned response.
2020-02-10 17:18:20 +00:00
type listState struct {
// Revision is the resourceVersion for the List object.
Revision string `json:"r,omitempty"`
// PartitionName is the name of the partition.
2020-06-05 20:30:33 +00:00
PartitionName string `json:"p,omitempty"`
// Continue is the continue token returned from Kubernetes for a partially filled list request.
// It is a subfield of the continue token returned from steve.
Continue string `json:"c,omitempty"`
// Offset is the offset from the start of the list within the partition to begin the result list.
Offset int `json:"o,omitempty"`
// Limit is the maximum number of items from all partitions to return in the result.
Limit int `json:"l,omitempty"`
2020-02-10 17:18:20 +00:00
}
// feeder spawns a goroutine to list resources in each partition and feeds the
// results, in order by partition index, into a channel.
// If the sum of the results from all partitions (by namespaces or names) is
// greater than the limit parameter from the user request or the default of
// 100000, the result is truncated and a continue token is generated that
// indicates the partition and offset for the client to start on in the next
// request.
func (p *ParallelPartitionLister) feeder(ctx context.Context, state listState, limit int, result chan []unstructured.Unstructured) {
2020-02-10 17:18:20 +00:00
var (
sem = semaphore.NewWeighted(p.Concurrency)
capacity = limit
last chan struct{}
)
eg, ctx := errgroup.WithContext(ctx)
defer func() {
err := eg.Wait()
if p.err == nil {
p.err = err
}
close(result)
}()
2020-06-05 20:30:33 +00:00
for i := indexOrZero(p.Partitions, state.PartitionName); i < len(p.Partitions); i++ {
if (limit > 0 && capacity <= 0) || isDone(ctx) {
2020-02-10 17:18:20 +00:00
break
}
var (
partition = p.Partitions[i]
tickets = int64(1)
turn = last
next = make(chan struct{})
)
// setup a linked list of channel to control insertion order
last = next
// state.Revision is decoded from the continue token, there won't be a revision on the first request.
2020-02-10 17:18:20 +00:00
if state.Revision == "" {
// don't have a revision yet so grab all tickets to set a revision
tickets = 3
}
if err := sem.Acquire(ctx, tickets); err != nil {
p.err = err
break
}
// make state local for this partition
2020-02-10 17:18:20 +00:00
state := state
eg.Go(func() error {
defer sem.Release(tickets)
defer close(next)
for {
cont := ""
2020-06-05 20:30:33 +00:00
if partition.Name() == state.PartitionName {
2020-02-10 17:18:20 +00:00
cont = state.Continue
}
list, _, err := p.Lister(ctx, partition, cont, state.Revision, limit)
2020-02-10 17:18:20 +00:00
if err != nil {
return err
}
waitForTurn(ctx, turn)
if p.state != nil {
return nil
}
if state.Revision == "" {
state.Revision = list.GetResourceVersion()
2020-02-10 17:18:20 +00:00
}
if p.revision == "" {
p.revision = list.GetResourceVersion()
2020-02-10 17:18:20 +00:00
}
// We have already seen the first objects in the list, truncate up to the offset.
if state.PartitionName == partition.Name() && state.Offset > 0 && state.Offset < len(list.Items) {
list.Items = list.Items[state.Offset:]
2020-02-10 17:18:20 +00:00
}
// Case 1: the capacity has been reached across all goroutines but the list is still only partial,
// so save the state so that the next page can be requested later.
if limit > 0 && len(list.Items) > capacity {
result <- list.Items[:capacity]
2020-02-10 17:18:20 +00:00
// save state to redo this list at this offset
p.state = &listState{
Revision: list.GetResourceVersion(),
2020-06-05 20:30:33 +00:00
PartitionName: partition.Name(),
Continue: cont,
Offset: capacity,
Limit: limit,
2020-02-10 17:18:20 +00:00
}
capacity = 0
return nil
}
result <- list.Items
capacity -= len(list.Items)
// Case 2: all objects have been returned, we are done.
if list.GetContinue() == "" {
return nil
}
// Case 3: we started at an offset and truncated the list to skip the objects up to the offset.
// We're not yet up to capacity and have not retrieved every object,
// so loop again and get more data.
state.Continue = list.GetContinue()
state.PartitionName = partition.Name()
state.Offset = 0
2020-02-10 17:18:20 +00:00
}
})
}
p.err = eg.Wait()
}
func waitForTurn(ctx context.Context, turn chan struct{}) {
if turn == nil {
return
}
select {
case <-turn:
case <-ctx.Done():
}
}
func isDone(ctx context.Context) bool {
select {
case <-ctx.Done():
return true
default:
return false
}
}