From 95da447d9016afba34f064f2f64d851984d9015c Mon Sep 17 00:00:00 2001 From: Colleen Murphy Date: Mon, 10 Oct 2022 11:49:05 -0700 Subject: [PATCH 1/6] cleanup: remove duplicate apiserver import Remove duplicate import and make aliasing of other apiserver imports consistent throughout steve. --- pkg/accesscontrol/access_control.go | 4 ++-- pkg/schema/collection.go | 4 ++-- pkg/server/handler/apiserver.go | 5 ++--- 3 files changed, 6 insertions(+), 7 deletions(-) diff --git a/pkg/accesscontrol/access_control.go b/pkg/accesscontrol/access_control.go index fb91a32..391cc7e 100644 --- a/pkg/accesscontrol/access_control.go +++ b/pkg/accesscontrol/access_control.go @@ -1,7 +1,7 @@ package accesscontrol import ( - "github.com/rancher/apiserver/pkg/server" + apiserver "github.com/rancher/apiserver/pkg/server" "github.com/rancher/apiserver/pkg/types" "github.com/rancher/steve/pkg/attributes" "github.com/rancher/wrangler/pkg/kv" @@ -9,7 +9,7 @@ import ( ) type AccessControl struct { - server.SchemaBasedAccess + apiserver.SchemaBasedAccess } func NewAccessControl() *AccessControl { diff --git a/pkg/schema/collection.go b/pkg/schema/collection.go index 48f23ea..405b495 100644 --- a/pkg/schema/collection.go +++ b/pkg/schema/collection.go @@ -6,7 +6,7 @@ import ( "strings" "sync" - "github.com/rancher/apiserver/pkg/server" + apiserver "github.com/rancher/apiserver/pkg/server" "github.com/rancher/apiserver/pkg/types" "github.com/rancher/steve/pkg/accesscontrol" "github.com/rancher/steve/pkg/attributes" @@ -55,7 +55,7 @@ type Template struct { StoreFactory func(types.Store) types.Store } -func WrapServer(factory Factory, server *server.Server) http.Handler { +func WrapServer(factory Factory, server *apiserver.Server) http.Handler { return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { user, ok := request.UserFrom(req.Context()) if !ok { diff --git a/pkg/server/handler/apiserver.go b/pkg/server/handler/apiserver.go index aaf1a65..e0333ac 100644 --- a/pkg/server/handler/apiserver.go +++ b/pkg/server/handler/apiserver.go @@ -3,7 +3,6 @@ package handler import ( "net/http" - "github.com/rancher/apiserver/pkg/server" apiserver "github.com/rancher/apiserver/pkg/server" "github.com/rancher/apiserver/pkg/types" "github.com/rancher/apiserver/pkg/urlbuilder" @@ -26,7 +25,7 @@ func New(cfg *rest.Config, sf schema.Factory, authMiddleware auth.Middleware, ne a := &apiServer{ sf: sf, - server: server.DefaultAPIServer(), + server: apiserver.DefaultAPIServer(), } a.server.AccessControl = accesscontrol.NewAccessControl() @@ -55,7 +54,7 @@ func New(cfg *rest.Config, sf schema.Factory, authMiddleware auth.Middleware, ne type apiServer struct { sf schema.Factory - server *server.Server + server *apiserver.Server } func (a *apiServer) common(rw http.ResponseWriter, req *http.Request) (*types.APIRequest, bool) { From 18afa8868fd561c16cb27afff46ded529cab65cc Mon Sep 17 00:00:00 2001 From: Colleen Murphy Date: Mon, 10 Oct 2022 11:50:11 -0700 Subject: [PATCH 2/6] cleanup: Remove unused function HandlerFromConfig was not used in steve or rancher. --- pkg/proxy/proxy.go | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/pkg/proxy/proxy.go b/pkg/proxy/proxy.go index 942644d..0006e5f 100644 --- a/pkg/proxy/proxy.go +++ b/pkg/proxy/proxy.go @@ -5,7 +5,6 @@ import ( "net/url" "strings" - "github.com/rancher/wrangler/pkg/kubeconfig" "github.com/sirupsen/logrus" "k8s.io/apimachinery/pkg/util/proxy" "k8s.io/apiserver/pkg/authentication/user" @@ -14,17 +13,6 @@ import ( "k8s.io/client-go/transport" ) -// Mostly copied from "kubectl proxy" code -func HandlerFromConfig(prefix, kubeConfig string) (http.Handler, error) { - loader := kubeconfig.GetInteractiveClientConfig(kubeConfig) - cfg, err := loader.ClientConfig() - if err != nil { - return nil, err - } - - return Handler(prefix, cfg) -} - func ImpersonatingHandler(prefix string, cfg *rest.Config) http.Handler { return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { impersonate(rw, req, prefix, cfg) From 70ea2828610cf8db956e1a89934458420d5d63f5 Mon Sep 17 00:00:00 2001 From: Colleen Murphy Date: Mon, 10 Oct 2022 11:52:26 -0700 Subject: [PATCH 3/6] cleanup: Fix API root header The header defined in RFC 9110[1] is "Accept", not "Accepts". This change corrects the route filter. Since this API is not documented and the header was plainly incorrect, no attempt is made at backwards compatibility. [1] https://www.rfc-editor.org/rfc/rfc9110.html#name-accept --- pkg/server/router/router.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/server/router/router.go b/pkg/server/router/router.go index 8ad7e75..87ca48f 100644 --- a/pkg/server/router/router.go +++ b/pkg/server/router/router.go @@ -22,7 +22,7 @@ func Routes(h Handlers) http.Handler { m.StrictSlash(true) m.Use(urlbuilder.RedirectRewrite) - m.Path("/").Handler(h.APIRoot).HeadersRegexp("Accepts", ".*json.*") + m.Path("/").Handler(h.APIRoot).HeadersRegexp("Accept", ".*json.*") m.Path("/{name:v1}").Handler(h.APIRoot) m.Path("/v1/{type}").Handler(h.K8sResource) From 475f311d0bf35d581f1d262eb6896402a0a90dcf Mon Sep 17 00:00:00 2001 From: Colleen Murphy Date: Mon, 10 Oct 2022 12:00:10 -0700 Subject: [PATCH 4/6] cleanup: Remove unused switchstore store Usage of the switchstore store was removed when the clusters resource was removed in dcea1db. --- pkg/stores/switchstore/store.go | 59 --------------------------------- 1 file changed, 59 deletions(-) delete mode 100644 pkg/stores/switchstore/store.go diff --git a/pkg/stores/switchstore/store.go b/pkg/stores/switchstore/store.go deleted file mode 100644 index 95dfb31..0000000 --- a/pkg/stores/switchstore/store.go +++ /dev/null @@ -1,59 +0,0 @@ -package switchstore - -import ( - "github.com/rancher/apiserver/pkg/types" -) - -type StorePicker func(apiOp *types.APIRequest, schema *types.APISchema, verb, id string) (types.Store, error) - -type Store struct { - Picker StorePicker -} - -func (e *Store) Delete(apiOp *types.APIRequest, schema *types.APISchema, id string) (types.APIObject, error) { - s, err := e.Picker(apiOp, schema, "delete", id) - if err != nil { - return types.APIObject{}, err - } - return s.Delete(apiOp, schema, id) -} - -func (e *Store) ByID(apiOp *types.APIRequest, schema *types.APISchema, id string) (types.APIObject, error) { - s, err := e.Picker(apiOp, schema, "get", id) - if err != nil { - return types.APIObject{}, err - } - return s.ByID(apiOp, schema, id) -} - -func (e *Store) List(apiOp *types.APIRequest, schema *types.APISchema) (types.APIObjectList, error) { - s, err := e.Picker(apiOp, schema, "list", "") - if err != nil { - return types.APIObjectList{}, err - } - return s.List(apiOp, schema) -} - -func (e *Store) Create(apiOp *types.APIRequest, schema *types.APISchema, data types.APIObject) (types.APIObject, error) { - s, err := e.Picker(apiOp, schema, "create", "") - if err != nil { - return types.APIObject{}, err - } - return s.Create(apiOp, schema, data) -} - -func (e *Store) Update(apiOp *types.APIRequest, schema *types.APISchema, data types.APIObject, id string) (types.APIObject, error) { - s, err := e.Picker(apiOp, schema, "update", id) - if err != nil { - return types.APIObject{}, err - } - return s.Update(apiOp, schema, data, id) -} - -func (e *Store) Watch(apiOp *types.APIRequest, schema *types.APISchema, wr types.WatchRequest) (chan types.APIEvent, error) { - s, err := e.Picker(apiOp, schema, "watch", "") - if err != nil { - return nil, err - } - return s.Watch(apiOp, schema, wr) -} From ea61c2187aba3b8c1a16fede2a021fcaf8d780ab Mon Sep 17 00:00:00 2001 From: Colleen Murphy Date: Tue, 11 Oct 2022 12:28:51 -0700 Subject: [PATCH 5/6] cleanup: Remove unused namespace constraint The use of AddNamespaceConstraint was removed in e35b8304 of rancher/rancher, so there is no possibility of there being a namespace constraint in the request context. Remove the unused function and the unused codepath from the rbac store. --- pkg/stores/proxy/rbac_store.go | 45 ++-------------------------------- 1 file changed, 2 insertions(+), 43 deletions(-) diff --git a/pkg/stores/proxy/rbac_store.go b/pkg/stores/proxy/rbac_store.go index 31c73bc..30e6b80 100644 --- a/pkg/stores/proxy/rbac_store.go +++ b/pkg/stores/proxy/rbac_store.go @@ -1,9 +1,7 @@ package proxy import ( - "context" "fmt" - "net/http" "sort" "github.com/rancher/apiserver/pkg/types" @@ -20,19 +18,6 @@ var ( } ) -type filterKey struct{} - -func AddNamespaceConstraint(req *http.Request, names ...string) *http.Request { - set := sets.NewString(names...) - ctx := context.WithValue(req.Context(), filterKey{}, set) - return req.WithContext(ctx) -} - -func getNamespaceConstraint(req *http.Request) (sets.String, bool) { - set, ok := req.Context().Value(filterKey{}).(sets.String) - return set, ok -} - type Partition struct { Namespace string All bool @@ -128,35 +113,9 @@ func (b *byNameOrNamespaceStore) Watch(apiOp *types.APIRequest, schema *types.AP return b.Store.WatchNames(apiOp, schema, wr, b.partition.Names) } +// isPassthrough determines whether a request can be passed through directly to the underlying store +// or if the results need to be partitioned by namespace and name based on the requester's access. func isPassthrough(apiOp *types.APIRequest, schema *types.APISchema, verb string) ([]partition.Partition, bool) { - partitions, passthrough := isPassthroughUnconstrained(apiOp, schema, verb) - namespaces, ok := getNamespaceConstraint(apiOp.Request) - if !ok { - return partitions, passthrough - } - - var result []partition.Partition - - if passthrough { - for namespace := range namespaces { - result = append(result, Partition{ - Namespace: namespace, - All: true, - }) - } - return result, false - } - - for _, partition := range partitions { - if namespaces.Has(partition.Name()) { - result = append(result, partition) - } - } - - return result, false -} - -func isPassthroughUnconstrained(apiOp *types.APIRequest, schema *types.APISchema, verb string) ([]partition.Partition, bool) { accessListByVerb, _ := attributes.Access(schema).(accesscontrol.AccessListByVerb) if accessListByVerb.All(verb) { return nil, true From b16d502fc79b8071f7dc56e8a77c6086a97e8a6f Mon Sep 17 00:00:00 2001 From: Colleen Murphy Date: Fri, 14 Oct 2022 16:17:32 -0700 Subject: [PATCH 6/6] docs: Add docs for partition and proxy store --- pkg/stores/partition/parallel.go | 64 +++++++++++++++++++++++++------ pkg/stores/partition/store.go | 20 ++++++++-- pkg/stores/proxy/error_wrapper.go | 8 +++- pkg/stores/proxy/proxy_store.go | 22 +++++++++++ pkg/stores/proxy/rbac_store.go | 11 ++++++ pkg/stores/proxy/watch_refresh.go | 2 + 6 files changed, 111 insertions(+), 16 deletions(-) diff --git a/pkg/stores/partition/parallel.go b/pkg/stores/partition/parallel.go index 1b77486..4e9f3e5 100644 --- a/pkg/stores/partition/parallel.go +++ b/pkg/stores/partition/parallel.go @@ -10,29 +10,42 @@ import ( "golang.org/x/sync/semaphore" ) +// Partition represents a named grouping of kubernetes resources, +// such as by namespace or a set of names. type Partition interface { Name() string } +// ParallelPartitionLister defines how a set of partitions will be queried. type ParallelPartitionLister struct { - Lister PartitionLister + // Lister is the lister method for a single partition. + Lister PartitionLister + + // Concurrency is the weight of the semaphore. Concurrency int64 - Partitions []Partition - state *listState - revision string - err error + + // Partitions is the set of partitions that will be concurrently queried. + Partitions []Partition + + state *listState + revision string + err error } +// PartitionLister lists objects for one partition. type PartitionLister func(ctx context.Context, partition Partition, cont string, revision string, limit int) (types.APIObjectList, error) +// Err returns the latest error encountered. func (p *ParallelPartitionLister) Err() error { return p.err } +// Revision returns the revision for the current list state. func (p *ParallelPartitionLister) Revision() string { return p.revision } +// Continue returns the encoded continue token based on the current list state. func (p *ParallelPartitionLister) Continue() string { if p.state == nil { return "" @@ -56,6 +69,9 @@ func indexOrZero(partitions []Partition, name string) int { return 0 } +// List returns a stream of objects up to the requested limit. +// If the continue token is not empty, it decodes it and returns the stream +// starting at the indicated marker. func (p *ParallelPartitionLister) List(ctx context.Context, limit int, resume string) (<-chan []types.APIObject, error) { var state listState if resume != "" { @@ -77,14 +93,33 @@ func (p *ParallelPartitionLister) List(ctx context.Context, limit int, resume st return result, nil } +// listState is a representation of the continuation point for a partial list. +// It is encoded as the continue token in the returned response. type listState struct { - Revision string `json:"r,omitempty"` + // Revision is the resourceVersion for the List object. + Revision string `json:"r,omitempty"` + + // PartitionName is the name of the partition. PartitionName string `json:"p,omitempty"` - Continue string `json:"c,omitempty"` - Offset int `json:"o,omitempty"` - Limit int `json:"l,omitempty"` + + // Continue is the continue token returned from Kubernetes for a partially filled list request. + // It is a subfield of the continue token returned from steve. + Continue string `json:"c,omitempty"` + + // Offset is the offset from the start of the list within the partition to begin the result list. + Offset int `json:"o,omitempty"` + + // Limit is the maximum number of items from all partitions to return in the result. + Limit int `json:"l,omitempty"` } +// feeder spawns a goroutine to list resources in each partition and feeds the +// results, in order by partition index, into a channel. +// If the sum of the results from all partitions (by namespaces or names) is +// greater than the limit parameter from the user request or the default of +// 100000, the result is truncated and a continue token is generated that +// indicates the partition and offset for the client to start on in the next +// request. func (p *ParallelPartitionLister) feeder(ctx context.Context, state listState, limit int, result chan []types.APIObject) { var ( sem = semaphore.NewWeighted(p.Concurrency) @@ -116,6 +151,7 @@ func (p *ParallelPartitionLister) feeder(ctx context.Context, state listState, l // setup a linked list of channel to control insertion order last = next + // state.Revision is decoded from the continue token, there won't be a revision on the first request. if state.Revision == "" { // don't have a revision yet so grab all tickets to set a revision tickets = 3 @@ -125,7 +161,7 @@ func (p *ParallelPartitionLister) feeder(ctx context.Context, state listState, l break } - // make state local + // make state local for this partition state := state eg.Go(func() error { defer sem.Release(tickets) @@ -154,10 +190,13 @@ func (p *ParallelPartitionLister) feeder(ctx context.Context, state listState, l p.revision = list.Revision } + // We have already seen the first objects in the list, truncate up to the offset. if state.PartitionName == partition.Name() && state.Offset > 0 && state.Offset < len(list.Objects) { list.Objects = list.Objects[state.Offset:] } + // Case 1: the capacity has been reached across all goroutines but the list is still only partial, + // so save the state so that the next page can be requested later. if len(list.Objects) > capacity { result <- list.Objects[:capacity] // save state to redo this list at this offset @@ -173,10 +212,13 @@ func (p *ParallelPartitionLister) feeder(ctx context.Context, state listState, l } result <- list.Objects capacity -= len(list.Objects) + // Case 2: all objects have been returned, we are done. if list.Continue == "" { return nil } - // loop again and get more data + // Case 3: we started at an offset and truncated the list to skip the objects up to the offset. + // We're not yet up to capacity and have not retrieved every object, + // so loop again and get more data. state.Continue = list.Continue state.PartitionName = partition.Name() state.Offset = 0 diff --git a/pkg/stores/partition/store.go b/pkg/stores/partition/store.go index 547eec1..4bb5340 100644 --- a/pkg/stores/partition/store.go +++ b/pkg/stores/partition/store.go @@ -9,12 +9,16 @@ import ( "golang.org/x/sync/errgroup" ) +const defaultLimit = 100000 + +// Partitioner is an interface for interacting with partitions. type Partitioner interface { Lookup(apiOp *types.APIRequest, schema *types.APISchema, verb, id string) (Partition, error) All(apiOp *types.APIRequest, schema *types.APISchema, verb, id string) ([]Partition, error) Store(apiOp *types.APIRequest, partition Partition) (types.Store, error) } +// Store implements types.Store for partitions. type Store struct { Partitioner Partitioner } @@ -28,6 +32,7 @@ func (s *Store) getStore(apiOp *types.APIRequest, schema *types.APISchema, verb, return s.Partitioner.Store(apiOp, p) } +// Delete deletes an object from a store. func (s *Store) Delete(apiOp *types.APIRequest, schema *types.APISchema, id string) (types.APIObject, error) { target, err := s.getStore(apiOp, schema, "delete", id) if err != nil { @@ -37,6 +42,7 @@ func (s *Store) Delete(apiOp *types.APIRequest, schema *types.APISchema, id stri return target.Delete(apiOp, schema, id) } +// ByID looks up a single object by its ID. func (s *Store) ByID(apiOp *types.APIRequest, schema *types.APISchema, id string) (types.APIObject, error) { target, err := s.getStore(apiOp, schema, "get", id) if err != nil { @@ -69,12 +75,14 @@ func (s *Store) listPartition(ctx context.Context, apiOp *types.APIRequest, sche return store.List(req, schema) } +// List returns a list of objects across all applicable partitions. +// If pagination parameters are used, it returns a segment of the list. func (s *Store) List(apiOp *types.APIRequest, schema *types.APISchema) (types.APIObjectList, error) { var ( result types.APIObjectList ) - paritions, err := s.Partitioner.All(apiOp, schema, "list", "") + partitions, err := s.Partitioner.All(apiOp, schema, "list", "") if err != nil { return result, err } @@ -84,7 +92,7 @@ func (s *Store) List(apiOp *types.APIRequest, schema *types.APISchema) (types.AP return s.listPartition(ctx, apiOp, schema, partition, cont, revision, limit) }, Concurrency: 3, - Partitions: paritions, + Partitions: partitions, } resume := apiOp.Request.URL.Query().Get("continue") @@ -104,6 +112,7 @@ func (s *Store) List(apiOp *types.APIRequest, schema *types.APISchema) (types.AP return result, lister.Err() } +// Create creates a single object in the store. func (s *Store) Create(apiOp *types.APIRequest, schema *types.APISchema, data types.APIObject) (types.APIObject, error) { target, err := s.getStore(apiOp, schema, "create", "") if err != nil { @@ -113,6 +122,7 @@ func (s *Store) Create(apiOp *types.APIRequest, schema *types.APISchema, data ty return target.Create(apiOp, schema, data) } +// Update updates a single object in the store. func (s *Store) Update(apiOp *types.APIRequest, schema *types.APISchema, data types.APIObject, id string) (types.APIObject, error) { target, err := s.getStore(apiOp, schema, "update", id) if err != nil { @@ -122,6 +132,7 @@ func (s *Store) Update(apiOp *types.APIRequest, schema *types.APISchema, data ty return target.Update(apiOp, schema, data, id) } +// Watch returns a channel of events for a list or resource. func (s *Store) Watch(apiOp *types.APIRequest, schema *types.APISchema, wr types.WatchRequest) (chan types.APIEvent, error) { partitions, err := s.Partitioner.All(apiOp, schema, "watch", wr.ID) if err != nil { @@ -164,6 +175,9 @@ func (s *Store) Watch(apiOp *types.APIRequest, schema *types.APISchema, wr types return response, nil } +// getLimit extracts the limit parameter from the request or sets a default of 100000. +// Since a default is always set, this implies that clients must always be +// aware that the list may be incomplete. func getLimit(req *http.Request) int { limitString := req.URL.Query().Get("limit") limit, err := strconv.Atoi(limitString) @@ -171,7 +185,7 @@ func getLimit(req *http.Request) int { limit = 0 } if limit <= 0 { - limit = 100000 + limit = defaultLimit } return limit } diff --git a/pkg/stores/proxy/error_wrapper.go b/pkg/stores/proxy/error_wrapper.go index 4aa9fa4..01af8d1 100644 --- a/pkg/stores/proxy/error_wrapper.go +++ b/pkg/stores/proxy/error_wrapper.go @@ -11,34 +11,38 @@ type errorStore struct { types.Store } +// ByID looks up a single object by its ID. func (e *errorStore) ByID(apiOp *types.APIRequest, schema *types.APISchema, id string) (types.APIObject, error) { data, err := e.Store.ByID(apiOp, schema, id) return data, translateError(err) } +// List returns a list of resources. func (e *errorStore) List(apiOp *types.APIRequest, schema *types.APISchema) (types.APIObjectList, error) { data, err := e.Store.List(apiOp, schema) return data, translateError(err) } +// Create creates a single object in the store. func (e *errorStore) Create(apiOp *types.APIRequest, schema *types.APISchema, data types.APIObject) (types.APIObject, error) { data, err := e.Store.Create(apiOp, schema, data) return data, translateError(err) - } +// Update updates a single object in the store. func (e *errorStore) Update(apiOp *types.APIRequest, schema *types.APISchema, data types.APIObject, id string) (types.APIObject, error) { data, err := e.Store.Update(apiOp, schema, data, id) return data, translateError(err) - } +// Delete deletes an object from a store. func (e *errorStore) Delete(apiOp *types.APIRequest, schema *types.APISchema, id string) (types.APIObject, error) { data, err := e.Store.Delete(apiOp, schema, id) return data, translateError(err) } +// Watch returns a channel of events for a list or resource. func (e *errorStore) Watch(apiOp *types.APIRequest, schema *types.APISchema, wr types.WatchRequest) (chan types.APIEvent, error) { data, err := e.Store.Watch(apiOp, schema, wr) return data, translateError(err) diff --git a/pkg/stores/proxy/proxy_store.go b/pkg/stores/proxy/proxy_store.go index ce9fd99..8299740 100644 --- a/pkg/stores/proxy/proxy_store.go +++ b/pkg/stores/proxy/proxy_store.go @@ -46,6 +46,7 @@ func init() { metav1.AddToGroupVersion(paramScheme, metav1.SchemeGroupVersion) } +// ClientGetter is a dynamic kubernetes client factory. type ClientGetter interface { IsImpersonating() bool K8sInterface(ctx *types.APIRequest) (kubernetes.Interface, error) @@ -59,15 +60,18 @@ type ClientGetter interface { TableAdminClientForWatch(ctx *types.APIRequest, schema *types.APISchema, namespace string) (dynamic.ResourceInterface, error) } +// RelationshipNotifier is an interface for handling wrangler summary.Relationship events. type RelationshipNotifier interface { OnInboundRelationshipChange(ctx context.Context, schema *types.APISchema, namespace string) <-chan *summary.Relationship } +// Store implements types.Store directly on top of kubernetes. type Store struct { clientGetter ClientGetter notifier RelationshipNotifier } +// NewProxyStore returns a wrapped types.Store. func NewProxyStore(clientGetter ClientGetter, notifier RelationshipNotifier, lookup accesscontrol.AccessSetLookup) types.Store { return &errorStore{ Store: &WatchRefresh{ @@ -84,6 +88,7 @@ func NewProxyStore(clientGetter ClientGetter, notifier RelationshipNotifier, loo } } +// ByID looks up a single object by its ID. func (s *Store) ByID(apiOp *types.APIRequest, schema *types.APISchema, id string) (types.APIObject, error) { result, err := s.byID(apiOp, schema, apiOp.Namespace, id) return toAPI(schema, result), err @@ -219,6 +224,12 @@ func tableToObjects(obj map[string]interface{}) []unstructured.Unstructured { return result } +// ByNames filters a list of objects by an allowed set of names. +// In plain kubernetes, if a user has permission to 'list' or 'watch' a defined set of resource names, +// performing the list or watch will result in a Forbidden error, because the user does not have permission +// to list *all* resources. +// With this filter, the request can be performed successfully, and only the allowed resources will +// be returned in the list. func (s *Store) ByNames(apiOp *types.APIRequest, schema *types.APISchema, names sets.String) (types.APIObjectList, error) { if apiOp.Namespace == "*" { // This happens when you grant namespaced objects with "get" by name in a clusterrolebinding. We will treat @@ -247,6 +258,7 @@ func (s *Store) ByNames(apiOp *types.APIRequest, schema *types.APISchema, names return objs, nil } +// List returns a list of resources. func (s *Store) List(apiOp *types.APIRequest, schema *types.APISchema) (types.APIObjectList, error) { client, err := s.clientGetter.TableClient(apiOp, schema, apiOp.Namespace) if err != nil { @@ -360,6 +372,12 @@ func (s *Store) listAndWatch(apiOp *types.APIRequest, client dynamic.ResourceInt return } +// WatchNames returns a channel of events filtered by an allowed set of names. +// In plain kubernetes, if a user has permission to 'list' or 'watch' a defined set of resource names, +// performing the list or watch will result in a Forbidden error, because the user does not have permission +// to list *all* resources. +// With this filter, the request can be performed successfully, and only the allowed resources will +// be returned in watch. func (s *Store) WatchNames(apiOp *types.APIRequest, schema *types.APISchema, w types.WatchRequest, names sets.String) (chan types.APIEvent, error) { adminClient, err := s.clientGetter.TableAdminClientForWatch(apiOp, schema, apiOp.Namespace) if err != nil { @@ -383,6 +401,7 @@ func (s *Store) WatchNames(apiOp *types.APIRequest, schema *types.APISchema, w t return result, nil } +// Watch returns a channel of events for a list or resource. func (s *Store) Watch(apiOp *types.APIRequest, schema *types.APISchema, w types.WatchRequest) (chan types.APIEvent, error) { client, err := s.clientGetter.TableClientForWatch(apiOp, schema, apiOp.Namespace) if err != nil { @@ -428,6 +447,7 @@ func (s *Store) toAPIEvent(apiOp *types.APIRequest, schema *types.APISchema, et return event } +// Create creates a single object in the store. func (s *Store) Create(apiOp *types.APIRequest, schema *types.APISchema, params types.APIObject) (types.APIObject, error) { var ( resp *unstructured.Unstructured @@ -468,6 +488,7 @@ func (s *Store) Create(apiOp *types.APIRequest, schema *types.APISchema, params return apiObject, err } +// Update updates a single object in the store. func (s *Store) Update(apiOp *types.APIRequest, schema *types.APISchema, params types.APIObject, id string) (types.APIObject, error) { var ( err error @@ -535,6 +556,7 @@ func (s *Store) Update(apiOp *types.APIRequest, schema *types.APISchema, params return toAPI(schema, resp), nil } +// Delete deletes an object from a store. func (s *Store) Delete(apiOp *types.APIRequest, schema *types.APISchema, id string) (types.APIObject, error) { opts := metav1.DeleteOptions{} if err := decodeParams(apiOp, &opts); err != nil { diff --git a/pkg/stores/proxy/rbac_store.go b/pkg/stores/proxy/rbac_store.go index 30e6b80..1562560 100644 --- a/pkg/stores/proxy/rbac_store.go +++ b/pkg/stores/proxy/rbac_store.go @@ -18,6 +18,7 @@ var ( } ) +// Partition is an implementation of the partition.Partition interface that uses RBAC to determine how a set of resources should be segregated and accessed. type Partition struct { Namespace string All bool @@ -25,14 +26,18 @@ type Partition struct { Names sets.String } +// Name returns the name of the partition, which for this type is the namespace. func (p Partition) Name() string { return p.Namespace } +// rbacPartitioner is an implementation of the partition.Partioner interface. type rbacPartitioner struct { proxyStore *Store } +// Lookup returns the default passthrough partition which is used only for retrieving single resources. +// Listing or watching resources require custom partitions. func (p *rbacPartitioner) Lookup(apiOp *types.APIRequest, schema *types.APISchema, verb, id string) (partition.Partition, error) { switch verb { case "create": @@ -48,6 +53,9 @@ func (p *rbacPartitioner) Lookup(apiOp *types.APIRequest, schema *types.APISchem } } +// All returns a slice of partitions applicable to the API schema and the user's access level. +// For watching individual resources or for blanket access permissions, it returns the passthrough partition. +// For more granular permissions, it returns a slice of partitions matching an allowed namespace or resource names. func (p *rbacPartitioner) All(apiOp *types.APIRequest, schema *types.APISchema, verb, id string) ([]partition.Partition, error) { switch verb { case "list": @@ -77,6 +85,7 @@ func (p *rbacPartitioner) All(apiOp *types.APIRequest, schema *types.APISchema, } } +// Store returns a proxy Store suited to listing and watching resources by partition. func (p *rbacPartitioner) Store(apiOp *types.APIRequest, partition partition.Partition) (types.Store, error) { return &byNameOrNamespaceStore{ Store: p.proxyStore, @@ -89,6 +98,7 @@ type byNameOrNamespaceStore struct { partition Partition } +// List returns a list of resources by partition. func (b *byNameOrNamespaceStore) List(apiOp *types.APIRequest, schema *types.APISchema) (types.APIObjectList, error) { if b.partition.Passthrough { return b.Store.List(apiOp, schema) @@ -101,6 +111,7 @@ func (b *byNameOrNamespaceStore) List(apiOp *types.APIRequest, schema *types.API return b.Store.ByNames(apiOp, schema, b.partition.Names) } +// Watch returns a channel of resources by partition. func (b *byNameOrNamespaceStore) Watch(apiOp *types.APIRequest, schema *types.APISchema, wr types.WatchRequest) (chan types.APIEvent, error) { if b.partition.Passthrough { return b.Store.Watch(apiOp, schema, wr) diff --git a/pkg/stores/proxy/watch_refresh.go b/pkg/stores/proxy/watch_refresh.go index 7674a16..b6e1929 100644 --- a/pkg/stores/proxy/watch_refresh.go +++ b/pkg/stores/proxy/watch_refresh.go @@ -9,11 +9,13 @@ import ( "k8s.io/apiserver/pkg/endpoints/request" ) +// WatchRefresh implements types.Store with awareness of changes to the requester's access. type WatchRefresh struct { types.Store asl accesscontrol.AccessSetLookup } +// Watch performs a watch request which halts if the user's access level changes. func (w *WatchRefresh) Watch(apiOp *types.APIRequest, schema *types.APISchema, wr types.WatchRequest) (chan types.APIEvent, error) { user, ok := request.UserFrom(apiOp.Context()) if !ok {