From b16d502fc79b8071f7dc56e8a77c6086a97e8a6f Mon Sep 17 00:00:00 2001 From: Colleen Murphy Date: Fri, 14 Oct 2022 16:17:32 -0700 Subject: [PATCH] docs: Add docs for partition and proxy store --- pkg/stores/partition/parallel.go | 64 +++++++++++++++++++++++++------ pkg/stores/partition/store.go | 20 ++++++++-- pkg/stores/proxy/error_wrapper.go | 8 +++- pkg/stores/proxy/proxy_store.go | 22 +++++++++++ pkg/stores/proxy/rbac_store.go | 11 ++++++ pkg/stores/proxy/watch_refresh.go | 2 + 6 files changed, 111 insertions(+), 16 deletions(-) diff --git a/pkg/stores/partition/parallel.go b/pkg/stores/partition/parallel.go index 1b77486..4e9f3e5 100644 --- a/pkg/stores/partition/parallel.go +++ b/pkg/stores/partition/parallel.go @@ -10,29 +10,42 @@ import ( "golang.org/x/sync/semaphore" ) +// Partition represents a named grouping of kubernetes resources, +// such as by namespace or a set of names. type Partition interface { Name() string } +// ParallelPartitionLister defines how a set of partitions will be queried. type ParallelPartitionLister struct { - Lister PartitionLister + // Lister is the lister method for a single partition. + Lister PartitionLister + + // Concurrency is the weight of the semaphore. Concurrency int64 - Partitions []Partition - state *listState - revision string - err error + + // Partitions is the set of partitions that will be concurrently queried. + Partitions []Partition + + state *listState + revision string + err error } +// PartitionLister lists objects for one partition. type PartitionLister func(ctx context.Context, partition Partition, cont string, revision string, limit int) (types.APIObjectList, error) +// Err returns the latest error encountered. func (p *ParallelPartitionLister) Err() error { return p.err } +// Revision returns the revision for the current list state. func (p *ParallelPartitionLister) Revision() string { return p.revision } +// Continue returns the encoded continue token based on the current list state. func (p *ParallelPartitionLister) Continue() string { if p.state == nil { return "" @@ -56,6 +69,9 @@ func indexOrZero(partitions []Partition, name string) int { return 0 } +// List returns a stream of objects up to the requested limit. +// If the continue token is not empty, it decodes it and returns the stream +// starting at the indicated marker. func (p *ParallelPartitionLister) List(ctx context.Context, limit int, resume string) (<-chan []types.APIObject, error) { var state listState if resume != "" { @@ -77,14 +93,33 @@ func (p *ParallelPartitionLister) List(ctx context.Context, limit int, resume st return result, nil } +// listState is a representation of the continuation point for a partial list. +// It is encoded as the continue token in the returned response. type listState struct { - Revision string `json:"r,omitempty"` + // Revision is the resourceVersion for the List object. + Revision string `json:"r,omitempty"` + + // PartitionName is the name of the partition. PartitionName string `json:"p,omitempty"` - Continue string `json:"c,omitempty"` - Offset int `json:"o,omitempty"` - Limit int `json:"l,omitempty"` + + // Continue is the continue token returned from Kubernetes for a partially filled list request. + // It is a subfield of the continue token returned from steve. + Continue string `json:"c,omitempty"` + + // Offset is the offset from the start of the list within the partition to begin the result list. + Offset int `json:"o,omitempty"` + + // Limit is the maximum number of items from all partitions to return in the result. + Limit int `json:"l,omitempty"` } +// feeder spawns a goroutine to list resources in each partition and feeds the +// results, in order by partition index, into a channel. +// If the sum of the results from all partitions (by namespaces or names) is +// greater than the limit parameter from the user request or the default of +// 100000, the result is truncated and a continue token is generated that +// indicates the partition and offset for the client to start on in the next +// request. func (p *ParallelPartitionLister) feeder(ctx context.Context, state listState, limit int, result chan []types.APIObject) { var ( sem = semaphore.NewWeighted(p.Concurrency) @@ -116,6 +151,7 @@ func (p *ParallelPartitionLister) feeder(ctx context.Context, state listState, l // setup a linked list of channel to control insertion order last = next + // state.Revision is decoded from the continue token, there won't be a revision on the first request. if state.Revision == "" { // don't have a revision yet so grab all tickets to set a revision tickets = 3 @@ -125,7 +161,7 @@ func (p *ParallelPartitionLister) feeder(ctx context.Context, state listState, l break } - // make state local + // make state local for this partition state := state eg.Go(func() error { defer sem.Release(tickets) @@ -154,10 +190,13 @@ func (p *ParallelPartitionLister) feeder(ctx context.Context, state listState, l p.revision = list.Revision } + // We have already seen the first objects in the list, truncate up to the offset. if state.PartitionName == partition.Name() && state.Offset > 0 && state.Offset < len(list.Objects) { list.Objects = list.Objects[state.Offset:] } + // Case 1: the capacity has been reached across all goroutines but the list is still only partial, + // so save the state so that the next page can be requested later. if len(list.Objects) > capacity { result <- list.Objects[:capacity] // save state to redo this list at this offset @@ -173,10 +212,13 @@ func (p *ParallelPartitionLister) feeder(ctx context.Context, state listState, l } result <- list.Objects capacity -= len(list.Objects) + // Case 2: all objects have been returned, we are done. if list.Continue == "" { return nil } - // loop again and get more data + // Case 3: we started at an offset and truncated the list to skip the objects up to the offset. + // We're not yet up to capacity and have not retrieved every object, + // so loop again and get more data. state.Continue = list.Continue state.PartitionName = partition.Name() state.Offset = 0 diff --git a/pkg/stores/partition/store.go b/pkg/stores/partition/store.go index 547eec1..4bb5340 100644 --- a/pkg/stores/partition/store.go +++ b/pkg/stores/partition/store.go @@ -9,12 +9,16 @@ import ( "golang.org/x/sync/errgroup" ) +const defaultLimit = 100000 + +// Partitioner is an interface for interacting with partitions. type Partitioner interface { Lookup(apiOp *types.APIRequest, schema *types.APISchema, verb, id string) (Partition, error) All(apiOp *types.APIRequest, schema *types.APISchema, verb, id string) ([]Partition, error) Store(apiOp *types.APIRequest, partition Partition) (types.Store, error) } +// Store implements types.Store for partitions. type Store struct { Partitioner Partitioner } @@ -28,6 +32,7 @@ func (s *Store) getStore(apiOp *types.APIRequest, schema *types.APISchema, verb, return s.Partitioner.Store(apiOp, p) } +// Delete deletes an object from a store. func (s *Store) Delete(apiOp *types.APIRequest, schema *types.APISchema, id string) (types.APIObject, error) { target, err := s.getStore(apiOp, schema, "delete", id) if err != nil { @@ -37,6 +42,7 @@ func (s *Store) Delete(apiOp *types.APIRequest, schema *types.APISchema, id stri return target.Delete(apiOp, schema, id) } +// ByID looks up a single object by its ID. func (s *Store) ByID(apiOp *types.APIRequest, schema *types.APISchema, id string) (types.APIObject, error) { target, err := s.getStore(apiOp, schema, "get", id) if err != nil { @@ -69,12 +75,14 @@ func (s *Store) listPartition(ctx context.Context, apiOp *types.APIRequest, sche return store.List(req, schema) } +// List returns a list of objects across all applicable partitions. +// If pagination parameters are used, it returns a segment of the list. func (s *Store) List(apiOp *types.APIRequest, schema *types.APISchema) (types.APIObjectList, error) { var ( result types.APIObjectList ) - paritions, err := s.Partitioner.All(apiOp, schema, "list", "") + partitions, err := s.Partitioner.All(apiOp, schema, "list", "") if err != nil { return result, err } @@ -84,7 +92,7 @@ func (s *Store) List(apiOp *types.APIRequest, schema *types.APISchema) (types.AP return s.listPartition(ctx, apiOp, schema, partition, cont, revision, limit) }, Concurrency: 3, - Partitions: paritions, + Partitions: partitions, } resume := apiOp.Request.URL.Query().Get("continue") @@ -104,6 +112,7 @@ func (s *Store) List(apiOp *types.APIRequest, schema *types.APISchema) (types.AP return result, lister.Err() } +// Create creates a single object in the store. func (s *Store) Create(apiOp *types.APIRequest, schema *types.APISchema, data types.APIObject) (types.APIObject, error) { target, err := s.getStore(apiOp, schema, "create", "") if err != nil { @@ -113,6 +122,7 @@ func (s *Store) Create(apiOp *types.APIRequest, schema *types.APISchema, data ty return target.Create(apiOp, schema, data) } +// Update updates a single object in the store. func (s *Store) Update(apiOp *types.APIRequest, schema *types.APISchema, data types.APIObject, id string) (types.APIObject, error) { target, err := s.getStore(apiOp, schema, "update", id) if err != nil { @@ -122,6 +132,7 @@ func (s *Store) Update(apiOp *types.APIRequest, schema *types.APISchema, data ty return target.Update(apiOp, schema, data, id) } +// Watch returns a channel of events for a list or resource. func (s *Store) Watch(apiOp *types.APIRequest, schema *types.APISchema, wr types.WatchRequest) (chan types.APIEvent, error) { partitions, err := s.Partitioner.All(apiOp, schema, "watch", wr.ID) if err != nil { @@ -164,6 +175,9 @@ func (s *Store) Watch(apiOp *types.APIRequest, schema *types.APISchema, wr types return response, nil } +// getLimit extracts the limit parameter from the request or sets a default of 100000. +// Since a default is always set, this implies that clients must always be +// aware that the list may be incomplete. func getLimit(req *http.Request) int { limitString := req.URL.Query().Get("limit") limit, err := strconv.Atoi(limitString) @@ -171,7 +185,7 @@ func getLimit(req *http.Request) int { limit = 0 } if limit <= 0 { - limit = 100000 + limit = defaultLimit } return limit } diff --git a/pkg/stores/proxy/error_wrapper.go b/pkg/stores/proxy/error_wrapper.go index 4aa9fa4..01af8d1 100644 --- a/pkg/stores/proxy/error_wrapper.go +++ b/pkg/stores/proxy/error_wrapper.go @@ -11,34 +11,38 @@ type errorStore struct { types.Store } +// ByID looks up a single object by its ID. func (e *errorStore) ByID(apiOp *types.APIRequest, schema *types.APISchema, id string) (types.APIObject, error) { data, err := e.Store.ByID(apiOp, schema, id) return data, translateError(err) } +// List returns a list of resources. func (e *errorStore) List(apiOp *types.APIRequest, schema *types.APISchema) (types.APIObjectList, error) { data, err := e.Store.List(apiOp, schema) return data, translateError(err) } +// Create creates a single object in the store. func (e *errorStore) Create(apiOp *types.APIRequest, schema *types.APISchema, data types.APIObject) (types.APIObject, error) { data, err := e.Store.Create(apiOp, schema, data) return data, translateError(err) - } +// Update updates a single object in the store. func (e *errorStore) Update(apiOp *types.APIRequest, schema *types.APISchema, data types.APIObject, id string) (types.APIObject, error) { data, err := e.Store.Update(apiOp, schema, data, id) return data, translateError(err) - } +// Delete deletes an object from a store. func (e *errorStore) Delete(apiOp *types.APIRequest, schema *types.APISchema, id string) (types.APIObject, error) { data, err := e.Store.Delete(apiOp, schema, id) return data, translateError(err) } +// Watch returns a channel of events for a list or resource. func (e *errorStore) Watch(apiOp *types.APIRequest, schema *types.APISchema, wr types.WatchRequest) (chan types.APIEvent, error) { data, err := e.Store.Watch(apiOp, schema, wr) return data, translateError(err) diff --git a/pkg/stores/proxy/proxy_store.go b/pkg/stores/proxy/proxy_store.go index ce9fd99..8299740 100644 --- a/pkg/stores/proxy/proxy_store.go +++ b/pkg/stores/proxy/proxy_store.go @@ -46,6 +46,7 @@ func init() { metav1.AddToGroupVersion(paramScheme, metav1.SchemeGroupVersion) } +// ClientGetter is a dynamic kubernetes client factory. type ClientGetter interface { IsImpersonating() bool K8sInterface(ctx *types.APIRequest) (kubernetes.Interface, error) @@ -59,15 +60,18 @@ type ClientGetter interface { TableAdminClientForWatch(ctx *types.APIRequest, schema *types.APISchema, namespace string) (dynamic.ResourceInterface, error) } +// RelationshipNotifier is an interface for handling wrangler summary.Relationship events. type RelationshipNotifier interface { OnInboundRelationshipChange(ctx context.Context, schema *types.APISchema, namespace string) <-chan *summary.Relationship } +// Store implements types.Store directly on top of kubernetes. type Store struct { clientGetter ClientGetter notifier RelationshipNotifier } +// NewProxyStore returns a wrapped types.Store. func NewProxyStore(clientGetter ClientGetter, notifier RelationshipNotifier, lookup accesscontrol.AccessSetLookup) types.Store { return &errorStore{ Store: &WatchRefresh{ @@ -84,6 +88,7 @@ func NewProxyStore(clientGetter ClientGetter, notifier RelationshipNotifier, loo } } +// ByID looks up a single object by its ID. func (s *Store) ByID(apiOp *types.APIRequest, schema *types.APISchema, id string) (types.APIObject, error) { result, err := s.byID(apiOp, schema, apiOp.Namespace, id) return toAPI(schema, result), err @@ -219,6 +224,12 @@ func tableToObjects(obj map[string]interface{}) []unstructured.Unstructured { return result } +// ByNames filters a list of objects by an allowed set of names. +// In plain kubernetes, if a user has permission to 'list' or 'watch' a defined set of resource names, +// performing the list or watch will result in a Forbidden error, because the user does not have permission +// to list *all* resources. +// With this filter, the request can be performed successfully, and only the allowed resources will +// be returned in the list. func (s *Store) ByNames(apiOp *types.APIRequest, schema *types.APISchema, names sets.String) (types.APIObjectList, error) { if apiOp.Namespace == "*" { // This happens when you grant namespaced objects with "get" by name in a clusterrolebinding. We will treat @@ -247,6 +258,7 @@ func (s *Store) ByNames(apiOp *types.APIRequest, schema *types.APISchema, names return objs, nil } +// List returns a list of resources. func (s *Store) List(apiOp *types.APIRequest, schema *types.APISchema) (types.APIObjectList, error) { client, err := s.clientGetter.TableClient(apiOp, schema, apiOp.Namespace) if err != nil { @@ -360,6 +372,12 @@ func (s *Store) listAndWatch(apiOp *types.APIRequest, client dynamic.ResourceInt return } +// WatchNames returns a channel of events filtered by an allowed set of names. +// In plain kubernetes, if a user has permission to 'list' or 'watch' a defined set of resource names, +// performing the list or watch will result in a Forbidden error, because the user does not have permission +// to list *all* resources. +// With this filter, the request can be performed successfully, and only the allowed resources will +// be returned in watch. func (s *Store) WatchNames(apiOp *types.APIRequest, schema *types.APISchema, w types.WatchRequest, names sets.String) (chan types.APIEvent, error) { adminClient, err := s.clientGetter.TableAdminClientForWatch(apiOp, schema, apiOp.Namespace) if err != nil { @@ -383,6 +401,7 @@ func (s *Store) WatchNames(apiOp *types.APIRequest, schema *types.APISchema, w t return result, nil } +// Watch returns a channel of events for a list or resource. func (s *Store) Watch(apiOp *types.APIRequest, schema *types.APISchema, w types.WatchRequest) (chan types.APIEvent, error) { client, err := s.clientGetter.TableClientForWatch(apiOp, schema, apiOp.Namespace) if err != nil { @@ -428,6 +447,7 @@ func (s *Store) toAPIEvent(apiOp *types.APIRequest, schema *types.APISchema, et return event } +// Create creates a single object in the store. func (s *Store) Create(apiOp *types.APIRequest, schema *types.APISchema, params types.APIObject) (types.APIObject, error) { var ( resp *unstructured.Unstructured @@ -468,6 +488,7 @@ func (s *Store) Create(apiOp *types.APIRequest, schema *types.APISchema, params return apiObject, err } +// Update updates a single object in the store. func (s *Store) Update(apiOp *types.APIRequest, schema *types.APISchema, params types.APIObject, id string) (types.APIObject, error) { var ( err error @@ -535,6 +556,7 @@ func (s *Store) Update(apiOp *types.APIRequest, schema *types.APISchema, params return toAPI(schema, resp), nil } +// Delete deletes an object from a store. func (s *Store) Delete(apiOp *types.APIRequest, schema *types.APISchema, id string) (types.APIObject, error) { opts := metav1.DeleteOptions{} if err := decodeParams(apiOp, &opts); err != nil { diff --git a/pkg/stores/proxy/rbac_store.go b/pkg/stores/proxy/rbac_store.go index 30e6b80..1562560 100644 --- a/pkg/stores/proxy/rbac_store.go +++ b/pkg/stores/proxy/rbac_store.go @@ -18,6 +18,7 @@ var ( } ) +// Partition is an implementation of the partition.Partition interface that uses RBAC to determine how a set of resources should be segregated and accessed. type Partition struct { Namespace string All bool @@ -25,14 +26,18 @@ type Partition struct { Names sets.String } +// Name returns the name of the partition, which for this type is the namespace. func (p Partition) Name() string { return p.Namespace } +// rbacPartitioner is an implementation of the partition.Partioner interface. type rbacPartitioner struct { proxyStore *Store } +// Lookup returns the default passthrough partition which is used only for retrieving single resources. +// Listing or watching resources require custom partitions. func (p *rbacPartitioner) Lookup(apiOp *types.APIRequest, schema *types.APISchema, verb, id string) (partition.Partition, error) { switch verb { case "create": @@ -48,6 +53,9 @@ func (p *rbacPartitioner) Lookup(apiOp *types.APIRequest, schema *types.APISchem } } +// All returns a slice of partitions applicable to the API schema and the user's access level. +// For watching individual resources or for blanket access permissions, it returns the passthrough partition. +// For more granular permissions, it returns a slice of partitions matching an allowed namespace or resource names. func (p *rbacPartitioner) All(apiOp *types.APIRequest, schema *types.APISchema, verb, id string) ([]partition.Partition, error) { switch verb { case "list": @@ -77,6 +85,7 @@ func (p *rbacPartitioner) All(apiOp *types.APIRequest, schema *types.APISchema, } } +// Store returns a proxy Store suited to listing and watching resources by partition. func (p *rbacPartitioner) Store(apiOp *types.APIRequest, partition partition.Partition) (types.Store, error) { return &byNameOrNamespaceStore{ Store: p.proxyStore, @@ -89,6 +98,7 @@ type byNameOrNamespaceStore struct { partition Partition } +// List returns a list of resources by partition. func (b *byNameOrNamespaceStore) List(apiOp *types.APIRequest, schema *types.APISchema) (types.APIObjectList, error) { if b.partition.Passthrough { return b.Store.List(apiOp, schema) @@ -101,6 +111,7 @@ func (b *byNameOrNamespaceStore) List(apiOp *types.APIRequest, schema *types.API return b.Store.ByNames(apiOp, schema, b.partition.Names) } +// Watch returns a channel of resources by partition. func (b *byNameOrNamespaceStore) Watch(apiOp *types.APIRequest, schema *types.APISchema, wr types.WatchRequest) (chan types.APIEvent, error) { if b.partition.Passthrough { return b.Store.Watch(apiOp, schema, wr) diff --git a/pkg/stores/proxy/watch_refresh.go b/pkg/stores/proxy/watch_refresh.go index 7674a16..b6e1929 100644 --- a/pkg/stores/proxy/watch_refresh.go +++ b/pkg/stores/proxy/watch_refresh.go @@ -9,11 +9,13 @@ import ( "k8s.io/apiserver/pkg/endpoints/request" ) +// WatchRefresh implements types.Store with awareness of changes to the requester's access. type WatchRefresh struct { types.Store asl accesscontrol.AccessSetLookup } +// Watch performs a watch request which halts if the user's access level changes. func (w *WatchRefresh) Watch(apiOp *types.APIRequest, schema *types.APISchema, wr types.WatchRequest) (chan types.APIEvent, error) { user, ok := request.UserFrom(apiOp.Context()) if !ok {